Skip to content

Commit 32b2ed8

Browse files
committed
Specialize Dispatcher and Worker looping
The Worker and Dispatcher share the same poll loop logic (Poller#start_loop) while having different functional requirements. The Worker is poll looping despite not being able to execute new jobs if at capacity. The Dispatcher does require polling, but is reliant on shared logic in Poller#start_loop for a Dispatcher specific optimization. Changes: Move the logic controlling the sleep interval per poll from Poller#start_loop into Worker#poll and Dispatcher#poll by requiring #poll to return the `delay` value passed into interruptible_sleep. Poller#start_loop: * Removes the test based on the number of rows processed by #poll. This was Dispatcher specific logic. Worker#poll: * When Worker at full capacity: return a large value (10.minutes) effectively transforming Poller#start_loop from polling to wake-on-event. * When Worker < capacity: return `polling_interval` and maintain the poll timing until ReadyExecutions become available. Dispatcher#poll: * When `due` ScheduledExecutions.zero? return `polling_interval` and maintain the existing poll timing when no ScheduledExecutions are available to process. * When `due` ScheduledExecutions.postive? return 0. This results in interruptible_sleep(0) which returns immediately and without introducing any delays/sleeps between polls. This also allows for the existing behavior of looping through ScheduledExecutions via poll in order to check for shutdown requests between dispatch_next_batch interations.
1 parent 019d7c7 commit 32b2ed8

File tree

6 files changed

+61
-11
lines changed

6 files changed

+61
-11
lines changed

Diff for: lib/solid_queue/dispatcher.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ def metadata
2424
private
2525
def poll
2626
batch = dispatch_next_batch
27-
batch.size
27+
28+
batch.size.zero? ? polling_interval : 0.seconds
2829
end
2930

3031
def dispatch_next_batch

Diff for: lib/solid_queue/processes/interruptible.rb

+9-5
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ def interrupt
1212
queue << true
1313
end
1414

15+
# Sleeps for 'time'. Can be interrupted asynchronously and return early via wake_up.
16+
# @param time [Numeric] the time to sleep. 0 returns immediately.
17+
# @return [true, nil]
18+
# * returns `true` if an interrupt was requested via #wake_up between the
19+
# last call to `interruptible_sleep` and now, resulting in an early return.
20+
# * returns `nil` if it slept the full `time` and was not interrupted.
1521
def interruptible_sleep(time)
16-
# Invoking from the main thread can result in a 35% slowdown (at least when running the test suite).
17-
# Using some form of Async (Futures) addresses this performance issue.
22+
# Invoking this from the main thread may result in significant slowdown.
23+
# Utilizing asynchronous execution (Futures) addresses this performance issue.
1824
Concurrent::Promises.future(time) do |timeout|
19-
if timeout > 0 && queue.pop(timeout:)
20-
queue.clear
21-
end
25+
queue.pop(timeout:).tap { queue.clear }
2226
end.value
2327
end
2428

Diff for: lib/solid_queue/processes/poller.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@ def start_loop
2525
loop do
2626
break if shutting_down?
2727

28-
wrap_in_app_executor do
29-
unless poll > 0
30-
interruptible_sleep(polling_interval)
31-
end
28+
delay = wrap_in_app_executor do
29+
poll
3230
end
31+
32+
interruptible_sleep(delay)
3333
end
3434
ensure
3535
SolidQueue.instrument(:shutdown_process, process: self) do

Diff for: lib/solid_queue/worker.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ class Worker < Processes::Poller
77
after_boot :run_start_hooks
88
before_shutdown :run_stop_hooks
99

10+
1011
attr_accessor :queues, :pool
1112

1213
def initialize(**options)
@@ -29,7 +30,7 @@ def poll
2930
pool.post(execution)
3031
end
3132

32-
executions.size
33+
pool.idle? ? polling_interval : 10.minutes
3334
end
3435
end
3536

Diff for: test/unit/dispatcher_test.rb

+24
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,30 @@ class DispatcherTest < ActiveSupport::TestCase
9292
another_dispatcher&.stop
9393
end
9494

95+
test "sleeps `0.seconds` between polls if there are ready to dispatch jobs" do
96+
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
97+
dispatcher.expects(:interruptible_sleep).with(0.seconds).at_least(3)
98+
dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once
99+
100+
3.times { AddToBufferJob.set(wait: 0.1).perform_later("I'm scheduled") }
101+
assert_equal 3, SolidQueue::ScheduledExecution.count
102+
sleep 0.1
103+
104+
dispatcher.start
105+
wait_while_with_timeout(1.second) { SolidQueue::ScheduledExecution.any? }
106+
107+
assert_equal 0, SolidQueue::ScheduledExecution.count
108+
assert_equal 3, SolidQueue::ReadyExecution.count
109+
end
110+
111+
test "sleeps `polling_interval` between polls if there are no un-dispatched jobs" do
112+
dispatcher = SolidQueue::Dispatcher.new(polling_interval: 10, batch_size: 1)
113+
dispatcher.expects(:interruptible_sleep).with(0.seconds).never
114+
dispatcher.expects(:interruptible_sleep).with(dispatcher.polling_interval).at_least_once
115+
dispatcher.start
116+
sleep 0.1
117+
end
118+
95119
private
96120
def with_polling(silence:)
97121
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence

Diff for: test/unit/worker_test.rb

+20
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,26 @@ class WorkerTest < ActiveSupport::TestCase
171171
SolidQueue.process_heartbeat_interval = old_heartbeat_interval
172172
end
173173

174+
test "sleeps `10.minutes` if at capacity" do
175+
3.times { |i| StoreResultJob.perform_later(i, pause: 1.second) }
176+
177+
@worker.expects(:interruptible_sleep).with(10.minutes).at_least_once
178+
@worker.expects(:interruptible_sleep).with(@worker.polling_interval).never
179+
180+
@worker.start
181+
sleep 1.second
182+
end
183+
184+
test "sleeps `polling_interval` if worker not at capacity" do
185+
2.times { |i| StoreResultJob.perform_later(i, pause: 1.second) }
186+
187+
@worker.expects(:interruptible_sleep).with(@worker.polling_interval).at_least_once
188+
@worker.expects(:interruptible_sleep).with(10.minutes).never
189+
190+
@worker.start
191+
sleep 1.second
192+
end
193+
174194
private
175195
def with_polling(silence:)
176196
old_silence_polling, SolidQueue.silence_polling = SolidQueue.silence_polling, silence

0 commit comments

Comments
 (0)