Skip to content

Commit 722c09c

Browse files
authored
Merge pull request #47 from basecamp/multiple-schedulers
Improve support for multiple schedulers
2 parents 665bff1 + 442ee97 commit 722c09c

File tree

8 files changed

+93
-40
lines changed

8 files changed

+93
-40
lines changed

app/models/solid_queue/scheduled_execution.rb

+25-10
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,36 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution
66
assume_attributes_from_job :scheduled_at
77

88
class << self
9-
def prepare_batch(batch)
10-
prepared_at = Time.current
9+
def prepare_next_batch(batch_size)
10+
transaction do
11+
batch = next_batch(batch_size).lock("FOR UPDATE SKIP LOCKED").tap(&:load)
12+
prepare_batch batch
13+
end
14+
end
1115

12-
rows = batch.map do |scheduled_execution|
13-
scheduled_execution.ready_attributes.merge(created_at: prepared_at)
16+
private
17+
def prepare_batch(batch)
18+
if batch.empty? then []
19+
else
20+
promote_batch_to_ready(batch)
21+
end
1422
end
1523

16-
if rows.any?
17-
transaction do
18-
SolidQueue::ReadyExecution.insert_all(rows)
19-
where(id: batch.map(&:id)).delete_all
24+
def promote_batch_to_ready(batch)
25+
rows = ready_rows_from_batch(batch)
26+
27+
SolidQueue::ReadyExecution.insert_all(rows)
28+
SolidQueue::ReadyExecution.where(job_id: batch.map(&:job_id)).pluck(:job_id).tap do |enqueued_job_ids|
29+
where(job_id: enqueued_job_ids).delete_all
2030
end
2131
end
2232

23-
SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}")
24-
end
33+
def ready_rows_from_batch(batch)
34+
prepared_at = Time.current
35+
36+
batch.map do |scheduled_execution|
37+
scheduled_execution.ready_attributes.merge(created_at: prepared_at)
38+
end
39+
end
2540
end
2641
end

lib/solid_queue/interruptible.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def interrupt
1717
end
1818

1919
def interruptible_sleep(time)
20-
if self_pipe[:reader].wait_readable(time)
20+
if time > 0 && self_pipe[:reader].wait_readable(time)
2121
loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) }
2222
end
2323
rescue Errno::EAGAIN, Errno::EINTR

lib/solid_queue/runner.rb

+9
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ module Runner
1111

1212
def start(mode: :supervised)
1313
boot_in mode
14+
observe_starting_delay
1415

1516
run_callbacks(:start) do
1617
if mode == :async
@@ -44,6 +45,10 @@ def boot_in(mode)
4445
SolidQueue.logger.info("[SolidQueue] Starting #{self}")
4546
end
4647

48+
def observe_starting_delay
49+
interruptible_sleep(initial_jitter)
50+
end
51+
4752
def register_signal_handlers
4853
%w[ INT TERM ].each do |signal|
4954
trap(signal) do
@@ -81,6 +86,10 @@ def shutdown
8186
procline "shutting down"
8287
end
8388

89+
def initial_jitter
90+
0
91+
end
92+
8493
def stopping?
8594
@stopping
8695
end

lib/solid_queue/scheduler.rb

+19-11
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,17 @@ def initialize(**options)
1919

2020
private
2121
def run
22-
with_polling_volume do
23-
batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load)
22+
batch = prepare_next_batch
2423

25-
if batch.size > 0
26-
procline "preparing #{batch.size} jobs for execution"
24+
unless batch.size > 0
25+
procline "waiting"
26+
interruptible_sleep(polling_interval)
27+
end
28+
end
2729

28-
SolidQueue::ScheduledExecution.prepare_batch(batch)
29-
else
30-
procline "waiting"
31-
interruptible_sleep(polling_interval)
32-
end
30+
def prepare_next_batch
31+
with_polling_volume do
32+
SolidQueue::ScheduledExecution.prepare_next_batch(batch_size)
3333
end
3434
end
3535

@@ -51,11 +51,19 @@ def stop_concurrency_maintenance
5151
end
5252

5353
def expire_semaphores
54-
Semaphore.expired.in_batches(of: batch_size, &:delete_all)
54+
wrap_in_app_executor do
55+
Semaphore.expired.in_batches(of: batch_size, &:delete_all)
56+
end
5557
end
5658

5759
def unblock_blocked_executions
58-
BlockedExecution.unblock(batch_size)
60+
wrap_in_app_executor do
61+
BlockedExecution.unblock(batch_size)
62+
end
63+
end
64+
65+
def initial_jitter
66+
Kernel.rand(0...polling_interval)
5967
end
6068

6169
def metadata

test/integration/concurrency_controls_test.rb

+3-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
1414

1515
@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler })
1616

17-
wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor
17+
wait_for_registered_processes(5, timeout: 0.5.second) # 3 workers working the default queue + scheduler + supervisor
1818
end
1919

2020
teardown do
@@ -89,6 +89,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
8989
wait_for_jobs_to_finish_for(3.seconds)
9090
assert_no_pending_jobs
9191

92+
wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? }
9293
# Lock the semaphore so we can enqueue jobs and leave them blocked
9394
skip_active_record_query_cache do
9495
assert SolidQueue::Semaphore.wait(job)
@@ -121,6 +122,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
121122
wait_for_jobs_to_finish_for(3.seconds)
122123
assert_no_pending_jobs
123124

125+
wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? }
124126
# Lock the semaphore so we can enqueue jobs and leave them blocked
125127
skip_active_record_query_cache do
126128
assert SolidQueue::Semaphore.wait(job)

test/integration/processes_lifecycle_test.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
88
setup do
99
@pid = run_supervisor_as_fork
1010

11-
wait_for_registered_processes(3, timeout: 0.2.second)
11+
wait_for_registered_processes(3, timeout: 1.second)
1212
assert_registered_workers_for(:background, :default)
1313
end
1414

test/test_helper.rb

+17-16
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,7 @@ class ActiveSupport::TestCase
3636

3737
private
3838
def wait_for_jobs_to_finish_for(timeout = 1.second)
39-
skip_active_record_query_cache do
40-
Timeout.timeout(timeout) do
41-
while SolidQueue::Job.where(finished_at: nil).any? do
42-
sleep 0.05
43-
end
44-
end
45-
end
46-
rescue Timeout::Error
39+
wait_while_with_timeout(timeout) { SolidQueue::Job.where(finished_at: nil).any? }
4740
end
4841

4942
def assert_no_pending_jobs
@@ -59,12 +52,7 @@ def run_supervisor_as_fork(**options)
5952
end
6053

6154
def wait_for_registered_processes(count, timeout: 1.second)
62-
Timeout.timeout(timeout) do
63-
while SolidQueue::Process.count != count do
64-
sleep 0.05
65-
end
66-
end
67-
rescue Timeout::Error
55+
wait_while_with_timeout(timeout) { SolidQueue::Process.count != count }
6856
end
6957

7058
def assert_no_registered_processes
@@ -86,14 +74,27 @@ def terminate_process(pid, timeout: 10, signal: :TERM)
8674

8775
def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0)
8876
Timeout.timeout(timeout) do
89-
Process.waitpid(pid)
90-
assert exitstatus, $?.exitstatus
77+
if process_exists?(pid)
78+
Process.waitpid(pid)
79+
assert exitstatus, $?.exitstatus
80+
end
9181
end
9282
rescue Timeout::Error
9383
signal_process(pid, :KILL)
9484
raise
9585
end
9686

87+
def wait_while_with_timeout(timeout, &block)
88+
Timeout.timeout(timeout) do
89+
skip_active_record_query_cache do
90+
while block.call
91+
sleep 0.05
92+
end
93+
end
94+
end
95+
rescue Timeout::Error
96+
end
97+
9798
def signal_process(pid, signal, wait: nil)
9899
Thread.new do
99100
sleep(wait) if wait

test/unit/scheduler_test.rb

+18
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,22 @@ class SchedulerTest < ActiveSupport::TestCase
3939
ActiveRecord::Base.logger = old_logger
4040
SolidQueue.silence_polling = old_silence_polling
4141
end
42+
43+
test "run more than one instance of the scheduler" do
44+
15.times do
45+
AddToBufferJob.set(wait: 0.2).perform_later("I'm scheduled")
46+
end
47+
assert_equal 15, SolidQueue::ScheduledExecution.count
48+
49+
another_scheduler = SolidQueue::Scheduler.new(polling_interval: 0.1, batch_size: 10)
50+
@scheduler.start(mode: :async)
51+
another_scheduler.start(mode: :async)
52+
53+
sleep 0.5
54+
55+
assert_equal 0, SolidQueue::ScheduledExecution.count
56+
assert_equal 15, SolidQueue::ReadyExecution.count
57+
58+
another_scheduler.stop
59+
end
4260
end

0 commit comments

Comments
 (0)