Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 43f4de8

Browse files
committedDec 4, 2024·
Use Concurrent::Promises based TimerTask
Promises are the recommended infrastructure, replacing several OG APIs, including TimerTasks. SQ only uses TimerTasks in 3 places (currently) and a very small subset of their overall functionality. SolidQueue::TimerTask is a drop-in replacement. This PR uses AtomicBoolean instead of the recommended Concurrent::Cancellation to avoid a dependency on, and the potential API stability issues of, edge features. This completes the move from the old APIs to Promises and makes all of the new concurrency features (Actors, Channels, etc.) available for future SQ features and enahancements.
1 parent 5d40217 commit 43f4de8

File tree

6 files changed

+125
-23
lines changed

6 files changed

+125
-23
lines changed
 

‎Gemfile.lock

-2
Original file line numberDiff line numberDiff line change
@@ -130,8 +130,6 @@ GEM
130130
rake (13.2.1)
131131
rdoc (6.8.1)
132132
psych (>= 4.0.0)
133-
rdoc (6.6.3.1)
134-
psych (>= 4.0.0)
135133
regexp_parser (2.9.2)
136134
reline (0.5.12)
137135
io-console (~> 0.5)

‎lib/solid_queue/dispatcher/concurrency_maintenance.rb

+1-7
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,10 @@ def initialize(interval, batch_size)
1212
end
1313

1414
def start
15-
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: interval) do
15+
@concurrency_maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: interval) do
1616
expire_semaphores
1717
unblock_blocked_executions
1818
end
19-
20-
@concurrency_maintenance_task.add_observer do |_, _, error|
21-
handle_thread_error(error) if error
22-
end
23-
24-
@concurrency_maintenance_task.execute
2519
end
2620

2721
def stop

‎lib/solid_queue/processes/registrable.rb

+1-7
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,9 @@ def registered?
3737
end
3838

3939
def launch_heartbeat
40-
@heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do
40+
@heartbeat_task = SolidQueue::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) do
4141
wrap_in_app_executor { heartbeat }
4242
end
43-
44-
@heartbeat_task.add_observer do |_, _, error|
45-
handle_thread_error(error) if error
46-
end
47-
48-
@heartbeat_task.execute
4943
end
5044

5145
def stop_heartbeat

‎lib/solid_queue/supervisor/maintenance.rb

+2-7
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,11 @@ module Supervisor::Maintenance
77
end
88

99
private
10+
1011
def launch_maintenance_task
11-
@maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
12+
@maintenance_task = SolidQueue::TimerTask.new(run_now: true, execution_interval: SolidQueue.process_alive_threshold) do
1213
prune_dead_processes
1314
end
14-
15-
@maintenance_task.add_observer do |_, _, error|
16-
handle_thread_error(error) if error
17-
end
18-
19-
@maintenance_task.execute
2015
end
2116

2217
def stop_maintenance_task

‎lib/solid_queue/timer_task.rb

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class TimerTask
5+
include AppExecutor
6+
7+
def initialize(execution_interval:, run_now: false, &block)
8+
raise ArgumentError, "A block is required" unless block_given?
9+
@shutdown = Concurrent::AtomicBoolean.new
10+
11+
run(run_now, execution_interval, &block)
12+
end
13+
14+
def shutdown
15+
@shutdown.make_true
16+
end
17+
18+
private
19+
20+
def run(run_now, execution_interval, &block)
21+
execute_task(&block) if run_now
22+
23+
Concurrent::Promises.future(execution_interval) do |interval|
24+
repeating_task(interval, &block)
25+
end.run
26+
end
27+
28+
def execute_task(&block)
29+
block.call unless @shutdown.true?
30+
rescue Exception => e
31+
handle_thread_error(e)
32+
end
33+
34+
def repeating_task(interval, &block)
35+
Concurrent::Promises.schedule(interval) do
36+
execute_task(&block)
37+
end.then do
38+
repeating_task(interval, &block) unless @shutdown.true?
39+
end
40+
end
41+
end
42+
end

‎test/unit/timer_task_test.rb

+79
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
require "mocha/minitest"
5+
6+
class TimerTaskTest < ActiveSupport::TestCase
7+
test "initialization requires a block" do
8+
assert_raises(ArgumentError) do
9+
SolidQueue::TimerTask.new(execution_interval: 1)
10+
end
11+
end
12+
13+
test "task runs immediate when run now true" do
14+
executed = false
15+
16+
task = SolidQueue::TimerTask.new(run_now: true, execution_interval: 1) do
17+
executed = true
18+
end
19+
20+
sleep 0.1
21+
22+
assert executed, "Task should have executed immediately"
23+
task.shutdown
24+
end
25+
26+
test "task does not run immediately when run with run_now false" do
27+
executed = false
28+
29+
task = SolidQueue::TimerTask.new(run_now: false, execution_interval: 1) do
30+
executed = true
31+
end
32+
33+
sleep 0.1
34+
35+
assert_not executed, "Task should have executed immediately"
36+
task.shutdown
37+
end
38+
39+
test "task repeats" do
40+
executions = 0
41+
42+
task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) do
43+
executions += 1
44+
end
45+
46+
sleep(0.5) # Wait to accumulate some executions
47+
48+
assert executions > 3, "The block should be executed repeatedly"
49+
50+
task.shutdown
51+
end
52+
53+
test "task stops on shutdown" do
54+
executions = 0
55+
56+
task = SolidQueue::TimerTask.new(execution_interval: 0.1, run_now: false) { executions += 1 }
57+
58+
sleep(0.3) # Let the task run a few times
59+
60+
task.shutdown
61+
62+
current_executions = executions
63+
64+
sleep(0.5) # Ensure no more executions after shutdown
65+
66+
assert_equal current_executions, executions, "The task should stop executing after shutdown"
67+
end
68+
69+
test "calls handle_thread_error if task raises" do
70+
task = SolidQueue::TimerTask.new(execution_interval: 0.1) do
71+
raise ExpectedTestError.new
72+
end
73+
task.expects(:handle_thread_error).with(instance_of(ExpectedTestError))
74+
75+
sleep(0.2) # Give some time for the task to run and handle the error
76+
77+
task.shutdown
78+
end
79+
end

0 commit comments

Comments
 (0)
Please sign in to comment.