Skip to content

Commit 8a5de86

Browse files
committed
Fix bug when selecting releasable executions and make sure these are reported
Via SolidQueue.on_thread_error, in test environment.
1 parent 77b67b0 commit 8a5de86

File tree

7 files changed

+23
-11
lines changed

7 files changed

+23
-11
lines changed

app/models/solid_queue/blocked_execution.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ def release_one(concurrency_key)
2626

2727
private
2828
def releasable(concurrency_keys)
29-
semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).index_by(&:key)
29+
semaphores = Semaphore.where(key: concurrency_keys).select(:key, :value).index_by(&:key)
3030

3131
# Concurrency keys without semaphore + concurrency keys with open semaphore
32-
(concurrency_keys - semaphores.keys) | semaphores.select { |key, value| value > 0 }.map(&:first)
32+
(concurrency_keys - semaphores.keys) | semaphores.select { |key, semaphore| semaphore.value > 0 }.map(&:first)
3333
end
3434
end
3535

lib/solid_queue/scheduler.rb

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ def launch_concurrency_maintenance
3939
unblock_blocked_executions
4040
end
4141

42+
@concurrency_maintenance_task.add_observer do |_, _, error|
43+
handle_thread_error(error) if error
44+
end
45+
4246
@concurrency_maintenance_task.execute
4347
end
4448

Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
class SequentialUpdateResultJob < UpdateResultJob
2-
limits_concurrency key: ->(job_result, **) { job_result }, duration: 2.seconds
2+
limits_concurrency key: ->(job_result, **) { job_result }
33
end

test/dummy/config/application.rb

-2
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ class Application < Rails::Application
2828
# config.eager_load_paths << Rails.root.join("extras")
2929

3030
config.active_job.queue_adapter = :solid_queue
31-
32-
config.solid_queue.logger = ActiveSupport::Logger.new(nil)
3331
config.solid_queue.delete_finished_jobs = false
3432
end
3533
end

test/dummy/config/environments/development.rb

+2
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,6 @@
5656

5757
# Uncomment if you wish to allow Action Cable access from any origin.
5858
# config.action_cable.disable_request_forgery_protection = true
59+
60+
config.solid_queue.logger = ActiveSupport::Logger.new(nil)
5961
end

test/dummy/config/environments/test.rb

+4
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,8 @@
4747

4848
# Annotate rendered view with file names.
4949
# config.action_view.annotate_rendered_view_with_filenames = true
50+
51+
logger = ActiveSupport::Logger.new(STDOUT)
52+
config.solid_queue.on_thread_error = ->(exception) { logger.error("#{exception.class.name}: #{exception.message}\n#{exception.backtrace.join("\n")}") }
53+
config.solid_queue.logger = ActiveSupport::Logger.new(nil)
5054
end

test/integration/concurrency_controls_test.rb

+10-6
Original file line numberDiff line numberDiff line change
@@ -94,20 +94,20 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
9494
assert SolidQueue::Semaphore.wait(job)
9595
end
9696

97-
# Now enqueue more jobs under that same key. They'll be all locked. Use priorities
98-
# to ensure order.
97+
# Now enqueue more jobs under that same key. They'll be all locked
9998
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
10099
("B".."K").each do |name|
101100
SequentialUpdateResultJob.perform_later(@result, name: name)
102101
end
103102
end
104103

105-
# Then unlock the semaphore: this would be as if the first job had released
106-
# the semaphore but hadn't unblocked any jobs
104+
# Then unlock the semaphore and expire the jobs: this would be as if the first job had
105+
# released the semaphore but hadn't unblocked any jobs
106+
SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago)
107107
assert SolidQueue::Semaphore.signal(job)
108108

109109
# And wait for the scheduler to release the jobs
110-
wait_for_jobs_to_finish_for(5.seconds)
110+
wait_for_jobs_to_finish_for(3.seconds)
111111
assert_no_pending_jobs
112112

113113
# We can't ensure the order between B and C, because it depends on which worker wins when
@@ -133,8 +133,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
133133
end
134134
end
135135

136+
# Simulate expiration of semaphore and executions
137+
SolidQueue::Semaphore.where(key: job.concurrency_key).update_all(expires_at: 15.minutes.ago)
138+
SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago)
139+
136140
# And wait for scheduler to release the jobs
137-
wait_for_jobs_to_finish_for(5.seconds)
141+
wait_for_jobs_to_finish_for(3.seconds)
138142
assert_no_pending_jobs
139143

140144
# We can't ensure the order between B and C, because it depends on which worker wins when

0 commit comments

Comments
 (0)