Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve support for multiple schedulers #47

Merged
merged 7 commits into from
Nov 30, 2023
35 changes: 25 additions & 10 deletions app/models/solid_queue/scheduled_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,36 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution
assume_attributes_from_job :scheduled_at

class << self
def prepare_batch(batch)
prepared_at = Time.current
def prepare_next_batch(batch_size)
transaction do
batch = next_batch(batch_size).lock("FOR UPDATE SKIP LOCKED").tap(&:load)
prepare_batch batch
end
end

rows = batch.map do |scheduled_execution|
scheduled_execution.ready_attributes.merge(created_at: prepared_at)
private
def prepare_batch(batch)
if batch.empty? then []
else
promote_batch_to_ready(batch)
end
end

if rows.any?
transaction do
SolidQueue::ReadyExecution.insert_all(rows)
where(id: batch.map(&:id)).delete_all
def promote_batch_to_ready(batch)
rows = ready_rows_from_batch(batch)

SolidQueue::ReadyExecution.insert_all(rows)
SolidQueue::ReadyExecution.where(job_id: batch.map(&:job_id)).pluck(:job_id).tap do |enqueued_job_ids|
where(job_id: enqueued_job_ids).delete_all
end
end

SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}")
end
def ready_rows_from_batch(batch)
prepared_at = Time.current

batch.map do |scheduled_execution|
scheduled_execution.ready_attributes.merge(created_at: prepared_at)
end
end
end
end
2 changes: 1 addition & 1 deletion lib/solid_queue/interruptible.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def interrupt
end

def interruptible_sleep(time)
if self_pipe[:reader].wait_readable(time)
if time > 0 && self_pipe[:reader].wait_readable(time)
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
end
rescue Errno::EAGAIN, Errno::EINTR
Expand Down
9 changes: 9 additions & 0 deletions lib/solid_queue/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Runner

def start(mode: :supervised)
boot_in mode
observe_starting_delay

run_callbacks(:start) do
if mode == :async
Expand Down Expand Up @@ -44,6 +45,10 @@ def boot_in(mode)
SolidQueue.logger.info("[SolidQueue] Starting #{self}")
end

def observe_starting_delay
interruptible_sleep(initial_jitter)
end

def register_signal_handlers
%w[ INT TERM ].each do |signal|
trap(signal) do
Expand Down Expand Up @@ -81,6 +86,10 @@ def shutdown
procline "shutting down"
end

def initial_jitter
0
end

def stopping?
@stopping
end
Expand Down
30 changes: 19 additions & 11 deletions lib/solid_queue/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ def initialize(**options)

private
def run
with_polling_volume do
batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load)
batch = prepare_next_batch

if batch.size > 0
procline "preparing #{batch.size} jobs for execution"
unless batch.size > 0
procline "waiting"
interruptible_sleep(polling_interval)
end
end

SolidQueue::ScheduledExecution.prepare_batch(batch)
else
procline "waiting"
interruptible_sleep(polling_interval)
end
def prepare_next_batch
with_polling_volume do
SolidQueue::ScheduledExecution.prepare_next_batch(batch_size)
end
end

Expand All @@ -51,11 +51,19 @@ def stop_concurrency_maintenance
end

def expire_semaphores
Semaphore.expired.in_batches(of: batch_size, &:delete_all)
wrap_in_app_executor do
Semaphore.expired.in_batches(of: batch_size, &:delete_all)
end
end

def unblock_blocked_executions
BlockedExecution.unblock(batch_size)
wrap_in_app_executor do
BlockedExecution.unblock(batch_size)
end
end

def initial_jitter
Kernel.rand(0...polling_interval)
end

def metadata
Expand Down
4 changes: 3 additions & 1 deletion test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase

@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler })

wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor
wait_for_registered_processes(5, timeout: 0.5.second) # 3 workers working the default queue + scheduler + supervisor
end

teardown do
Expand Down Expand Up @@ -89,6 +89,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs

wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? }
# Lock the semaphore so we can enqueue jobs and leave them blocked
skip_active_record_query_cache do
assert SolidQueue::Semaphore.wait(job)
Expand Down Expand Up @@ -121,6 +122,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs

wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? }
# Lock the semaphore so we can enqueue jobs and leave them blocked
skip_active_record_query_cache do
assert SolidQueue::Semaphore.wait(job)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/processes_lifecycle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
setup do
@pid = run_supervisor_as_fork

wait_for_registered_processes(3, timeout: 0.2.second)
wait_for_registered_processes(3, timeout: 1.second)
assert_registered_workers_for(:background, :default)
end

Expand Down
33 changes: 17 additions & 16 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,7 @@ class ActiveSupport::TestCase

private
def wait_for_jobs_to_finish_for(timeout = 1.second)
skip_active_record_query_cache do
Timeout.timeout(timeout) do
while SolidQueue::Job.where(finished_at: nil).any? do
sleep 0.05
end
end
end
rescue Timeout::Error
wait_while_with_timeout(timeout) { SolidQueue::Job.where(finished_at: nil).any? }
end

def assert_no_pending_jobs
Expand All @@ -59,12 +52,7 @@ def run_supervisor_as_fork(**options)
end

def wait_for_registered_processes(count, timeout: 1.second)
Timeout.timeout(timeout) do
while SolidQueue::Process.count != count do
sleep 0.05
end
end
rescue Timeout::Error
wait_while_with_timeout(timeout) { SolidQueue::Process.count != count }
end

def assert_no_registered_processes
Expand All @@ -86,14 +74,27 @@ def terminate_process(pid, timeout: 10, signal: :TERM)

def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0)
Timeout.timeout(timeout) do
Process.waitpid(pid)
assert exitstatus, $?.exitstatus
if process_exists?(pid)
Process.waitpid(pid)
assert exitstatus, $?.exitstatus
end
end
rescue Timeout::Error
signal_process(pid, :KILL)
raise
end

def wait_while_with_timeout(timeout, &block)
Timeout.timeout(timeout) do
skip_active_record_query_cache do
while block.call
sleep 0.05
end
end
end
rescue Timeout::Error
end

def signal_process(pid, signal, wait: nil)
Thread.new do
sleep(wait) if wait
Expand Down
18 changes: 18 additions & 0 deletions test/unit/scheduler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,22 @@ class SchedulerTest < ActiveSupport::TestCase
ActiveRecord::Base.logger = old_logger
SolidQueue.silence_polling = old_silence_polling
end

test "run more than one instance of the scheduler" do
15.times do
AddToBufferJob.set(wait: 0.2).perform_later("I'm scheduled")
end
assert_equal 15, SolidQueue::ScheduledExecution.count

another_scheduler = SolidQueue::Scheduler.new(polling_interval: 0.1, batch_size: 10)
@scheduler.start(mode: :async)
another_scheduler.start(mode: :async)

sleep 0.5

assert_equal 0, SolidQueue::ScheduledExecution.count
assert_equal 15, SolidQueue::ReadyExecution.count

another_scheduler.stop
end
end