Skip to content

Commit 2201f65

Browse files
committed
Upgrade Worker.pool to Promises.future
* Upgrade Worker.pool from Future to Promises.future Promises.future leverages an improved, non-blocking, and lock-free implementation of Concurrent Ruby's Async runtime, enhancing performance and future feature compatibility. Promises.future was moved from edge to main in V1.1 (2018). * Replace ClaimedExecution::Results with Concurrent::Maybe `Results` was underutilized and essentially mirrored `Maybe`'s functionality. This change simplifies code and reduces redundancy by leveraging the `Concurrent::Maybe` class. * Centralize error reporting in AppExecutor.handle_thread_error This change was necessitated by the change to Promises.future. Concurrent Ruby has some very strong ideas about exceptions within a future with a little code rearranging, this change pulls the error / exception reporting responsibilities out of the Future execution code path and pushes it to AppExecutor#handle_thread_error. This change ensures that `Rails.error` is called exactly once per `handle_thread_error` invocation regardless of on_thread_error calling `Rails.error` or not. * Update tests to accommodate these changes
1 parent 0e88a8e commit 2201f65

File tree

9 files changed

+51
-39
lines changed

9 files changed

+51
-39
lines changed

app/models/solid_queue/claimed_execution.rb

+5-14
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,6 @@ class SolidQueue::ClaimedExecution < SolidQueue::Execution
55

66
scope :orphaned, -> { where.missing(:process) }
77

8-
class Result < Struct.new(:success, :error)
9-
def success?
10-
success
11-
end
12-
end
13-
148
class << self
159
def claiming(job_ids, process_id, &block)
1610
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
@@ -60,12 +54,9 @@ def discard_all_from_jobs(*)
6054
def perform
6155
result = execute
6256

63-
if result.success?
64-
finished
65-
else
66-
failed_with(result.error)
67-
raise result.error
68-
end
57+
result.just? ? finished : failed_with(result.reason)
58+
59+
result
6960
ensure
7061
job.unblock_next_blocked_job
7162
end
@@ -93,9 +84,9 @@ def failed_with(error)
9384
private
9485
def execute
9586
ActiveJob::Base.execute(job.arguments)
96-
Result.new(true, nil)
87+
Concurrent::Maybe.just(true)
9788
rescue Exception => e
98-
Result.new(false, e)
89+
Concurrent::Maybe.nothing(e)
9990
end
10091

10192
def finished

lib/solid_queue.rb

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ module SolidQueue
4040
mattr_accessor :preserve_finished_jobs, default: true
4141
mattr_accessor :clear_finished_jobs_after, default: 1.day
4242
mattr_accessor :default_concurrency_control_period, default: 3.minutes
43+
mattr_accessor :reporting_label, default: "SolidQueue-#{SolidQueue::VERSION}"
4344

4445
delegate :on_start, :on_stop, to: Supervisor
4546

lib/solid_queue/app_executor.rb

+31-4
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,38 @@ def wrap_in_app_executor(&block)
1111
end
1212

1313
def handle_thread_error(error)
14-
SolidQueue.instrument(:thread_error, error: error)
14+
CallErrorReporters.new(error).call
15+
end
16+
17+
private
18+
19+
# Handles error reporting and guarantees that Rails.error will be called if configured.
20+
#
21+
# This method performs the following actions:
22+
# 1. Invokes `SolidQueue.instrument` for `:thread_error`.
23+
# 2. Invokes `SolidQueue.on_thread_error` if it is configured.
24+
# 3. Invokes `Rails.error.report` if it wasn't invoked by one of the above calls.
25+
class CallErrorReporters
26+
# @param [Exception] error The error to be reported.
27+
def initialize(error)
28+
@error = error
29+
@reported = false
30+
end
31+
32+
def call
33+
SolidQueue.instrument(:thread_error, error: @error)
34+
Rails.error.subscribe(self) if Rails.error&.respond_to?(:subscribe)
1535

16-
if SolidQueue.on_thread_error
17-
SolidQueue.on_thread_error.call(error)
36+
SolidQueue.on_thread_error&.call(@error)
37+
38+
Rails.error.report(@error, handled: false, source: SolidQueue.reporting_label) unless @reported
39+
ensure
40+
Rails.error.unsubscribe(self) if Rails.error&.respond_to?(:unsubscribe)
41+
end
42+
43+
def report(*, **)
44+
@reported = true
45+
end
1846
end
19-
end
2047
end
2148
end

lib/solid_queue/engine.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class Engine < ::Rails::Engine
1818

1919
initializer "solid_queue.app_executor", before: :run_prepare_callbacks do |app|
2020
config.solid_queue.app_executor ||= app.executor
21-
config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false) }
21+
config.solid_queue.on_thread_error ||= ->(exception) { Rails.error.report(exception, handled: false, source: SolidQueue.reporting_label) }
2222

2323
SolidQueue.app_executor = config.solid_queue.app_executor
2424
SolidQueue.on_thread_error = config.solid_queue.on_thread_error

lib/solid_queue/log_subscriber.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def replace_fork(event)
162162

163163
private
164164
def formatted_event(event, action:, **attributes)
165-
"SolidQueue-#{SolidQueue::VERSION} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
165+
"#{SolidQueue.reporting_label} #{action} (#{event.duration.round(1)}ms) #{formatted_attributes(**attributes)}"
166166
end
167167

168168
def formatted_attributes(**attributes)

lib/solid_queue/pool.rb

+4-8
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,16 @@ def initialize(size, on_idle: nil)
1818
def post(execution)
1919
available_threads.decrement
2020

21-
future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
21+
Concurrent::Promises.future_on(executor, execution) do |thread_execution|
2222
wrap_in_app_executor do
23-
thread_execution.perform
23+
result = thread_execution.perform
24+
25+
handle_thread_error(result.reason) if result.rejected?
2426
ensure
2527
available_threads.increment
2628
mutex.synchronize { on_idle.try(:call) if idle? }
2729
end
2830
end
29-
30-
future.add_observer do |_, _, error|
31-
handle_thread_error(error) if error
32-
end
33-
34-
future.execute
3531
end
3632

3733
def idle_threads

test/integration/instrumentation_test.rb

+3-2
Original file line numberDiff line numberDiff line change
@@ -391,9 +391,10 @@ class InstrumentationTest < ActiveSupport::TestCase
391391

392392
test "thread errors emit thread_error events" do
393393
previous_thread_report_on_exception, Thread.report_on_exception = Thread.report_on_exception, false
394-
395394
error = ExpectedTestError.new("everything is broken")
396-
SolidQueue::ClaimedExecution::Result.expects(:new).raises(error).at_least_once
395+
396+
# Allows the job to process normally, but trigger the error path in ClaimedExecution.execute
397+
Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(error))
397398

398399
AddToBufferJob.perform_later "hey!"
399400

test/models/solid_queue/claimed_execution_test.rb

+4-8
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
2222
job = claimed_execution.job
2323

2424
assert_difference -> { SolidQueue::ClaimedExecution.count } => -1, -> { SolidQueue::FailedExecution.count } => 1 do
25-
assert_raises RuntimeError do
26-
claimed_execution.perform
27-
end
25+
claimed_execution.perform
2826
end
2927

3028
assert_not job.reload.finished?
@@ -39,12 +37,10 @@ class SolidQueue::ClaimedExecutionTest < ActiveSupport::TestCase
3937
test "job failures are reported via Rails error subscriber" do
4038
subscriber = ErrorBuffer.new
4139

42-
assert_raises RuntimeError do
43-
with_error_subscriber(subscriber) do
44-
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B")
40+
with_error_subscriber(subscriber) do
41+
claimed_execution = prepare_and_claim_job RaisingJob.perform_later(RuntimeError, "B")
4542

46-
claimed_execution.perform
47-
end
43+
claimed_execution.perform
4844
end
4945

5046
assert_equal 1, subscriber.errors.count

test/unit/worker_test.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class WorkerTest < ActiveSupport::TestCase
5151
subscriber = ErrorBuffer.new
5252
Rails.error.subscribe(subscriber)
5353

54-
SolidQueue::ClaimedExecution::Result.expects(:new).raises(ExpectedTestError.new("everything is broken")).at_least_once
54+
Concurrent::Maybe.expects(:just).returns(Concurrent::Maybe.nothing(ExpectedTestError.new("everything is broken")))
5555

5656
AddToBufferJob.perform_later "hey!"
5757

0 commit comments

Comments
 (0)