Skip to content

Commit 87f7861

Browse files
committed
Use a concurrent timer task to expire semaphores and unblock executions
This task is managed by the scheduler and uses its same polling interval to run.
1 parent 3f87333 commit 87f7861

File tree

5 files changed

+60
-35
lines changed

5 files changed

+60
-35
lines changed

app/models/solid_queue/blocked_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ class BlockedExecution < SolidQueue::Execution
44

55
has_one :semaphore, foreign_key: :concurrency_key, primary_key: :concurrency_key
66

7-
scope :releasable, -> { left_outer_joins(:execution_semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) }
7+
scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) }
88
scope :ordered, -> { order(priority: :asc) }
99

1010
class << self

app/models/solid_queue/semaphore.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
class SolidQueue::Semaphore < SolidQueue::Record
22
scope :available, -> { where("value > 0") }
33
scope :locked, -> { where(value: 0) }
4+
scope :expired, -> { where(expires_at: ...Time.current)}
45

56
class << self
67
def wait_for(concurrency_key, limit, duration)

lib/solid_queue/process_registration.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ module ProcessRegistration
99
define_callbacks :start, :run, :shutdown
1010

1111
set_callback :start, :before, :register
12-
set_callback :start, :before, :start_heartbeat
12+
set_callback :start, :before, :launch_heartbeat
1313

1414
set_callback :run, :after, -> { stop unless registered? }
1515

@@ -43,7 +43,7 @@ def registered?
4343
process.persisted?
4444
end
4545

46-
def start_heartbeat
46+
def launch_heartbeat
4747
@heartbeat_task = Concurrent::TimerTask.new(execution_interval: SolidQueue.process_heartbeat_interval) { heartbeat }
4848
@heartbeat_task.execute
4949
end

lib/solid_queue/scheduler.rb

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,60 @@
11
# frozen_string_literal: true
22

3-
class SolidQueue::Scheduler
4-
include SolidQueue::Runner
3+
module SolidQueue
4+
class Scheduler
5+
include Runner
56

6-
attr_accessor :batch_size, :polling_interval
7+
attr_accessor :batch_size, :polling_interval
78

8-
def initialize(**options)
9-
options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS)
9+
set_callback :start, :before, :launch_concurrency_maintenance
10+
set_callback :shutdown, :before, :stop_concurrency_maintenance
1011

11-
@batch_size = options[:batch_size]
12-
@polling_interval = options[:polling_interval]
13-
end
12+
def initialize(**options)
13+
options = options.dup.with_defaults(SolidQueue::Configuration::SCHEDULER_DEFAULTS)
14+
15+
@batch_size = options[:batch_size]
16+
@polling_interval = options[:polling_interval]
17+
end
1418

15-
private
16-
def run
17-
with_polling_volume do
18-
batch = SolidQueue::ScheduledExecution.next_batch(batch_size)
19+
private
20+
def run
21+
with_polling_volume do
22+
batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load)
1923

20-
if batch.size > 0
21-
procline "preparing #{batch.size} jobs for execution"
24+
if batch.size > 0
25+
procline "preparing #{batch.size} jobs for execution"
2226

23-
SolidQueue::ScheduledExecution.prepare_batch(batch)
24-
else
25-
procline "waiting"
26-
interruptible_sleep(polling_interval)
27+
SolidQueue::ScheduledExecution.prepare_batch(batch)
28+
else
29+
procline "waiting"
30+
interruptible_sleep(polling_interval)
31+
end
2732
end
2833
end
29-
end
3034

31-
def metadata
32-
super.merge(batch_size: batch_size, polling_interval: polling_interval)
33-
end
35+
def launch_concurrency_maintenance
36+
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: polling_interval) do
37+
expire_semaphores
38+
unblock_blocked_executions
39+
end
40+
41+
@concurrency_maintenance_task.execute
42+
end
43+
44+
def stop_concurrency_maintenance
45+
@concurrency_maintenance_task.shutdown
46+
end
47+
48+
def expire_semaphores
49+
Semaphore.expired.in_batches(of: batch_size, &:delete_all)
50+
end
51+
52+
def unblock_blocked_executions
53+
BlockedExecution.unblock(batch_size)
54+
end
55+
56+
def metadata
57+
super.merge(batch_size: batch_size, polling_interval: polling_interval)
58+
end
59+
end
3460
end

test/integration/concurrency_controls_test.rb

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
1010
@result = JobResult.create!(queue_name: "default", status: "seq: ")
1111

1212
default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 }
13-
@pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ] })
13+
scheduler = { polling_interval: 1, batch_size: 200 }
14+
15+
@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler })
1416

1517
wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor
1618
end
@@ -80,9 +82,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
8082
assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a
8183
end
8284

83-
test "rely on worker to unblock blocked executions with an available semaphore" do
84-
skip "Moving this task to the supervisor"
85-
85+
test "rely on scheduler to unblock blocked executions with an available semaphore" do
8686
# Simulate a scenario where we got an available semaphore and some stuck jobs
8787
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
8888

@@ -115,9 +115,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
115115
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
116116
end
117117

118-
test "rely on worker to unblock blocked executions with a missing semaphore" do
119-
skip "Moving this task to the supervisor"
120-
118+
test "rely on scheduler to unblock blocked executions with an expired semaphore" do
121119
# Simulate a scenario where we got an available semaphore and some stuck jobs
122120
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
123121
wait_for_jobs_to_finish_for(2.seconds)
@@ -135,10 +133,10 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
135133
end
136134
end
137135

138-
# Then delete the semaphore, as if we had cleared it
139-
SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).destroy!
136+
# Simulate semaphore expiration
137+
SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).update(expires_at: 1.hour.ago)
140138

141-
# And wait for workers to release the jobs
139+
# And wait for scheduler to release the jobs
142140
wait_for_jobs_to_finish_for(2.seconds)
143141
assert_no_pending_jobs
144142

0 commit comments

Comments
 (0)