Skip to content

Commit b80c1a5

Browse files
committed
Dispatch blocked executions when a concurrency-limited job finishes
Either successfully or failing, and also release the semaphore.
1 parent 1713b14 commit b80c1a5

File tree

10 files changed

+120
-36
lines changed

10 files changed

+120
-36
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
11
class SolidQueue::BlockedExecution < SolidQueue::Execution
22
assume_attributes_from_job :concurrency_limit, :concurrency_key
3+
4+
def self.release(concurrency_key)
5+
where(concurrency_key: concurrency_key).order(:priority).first&.release
6+
end
7+
8+
def release
9+
transaction do
10+
job.prepare_for_execution
11+
destroy!
12+
end
13+
end
314
end

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,24 @@ def acquire_concurrency_lock
1414
Semaphore.wait_for(concurrency_key, concurrency_limit)
1515
end
1616

17+
def release_concurrency_lock
18+
return false unless concurrency_limited?
19+
20+
Semaphore.release(concurrency_key)
21+
end
22+
1723
def block
1824
BlockedExecution.create_or_find_by!(job_id: id)
1925
end
2026

27+
def release_next_blocked_job
28+
BlockedExecution.release(concurrency_key)
29+
end
30+
2131
def concurrency_limited?
2232
concurrency_limit.to_i > 0 && concurrency_key.present?
2333
end
34+
2435
end
2536
end
2637
end

app/models/solid_queue/job/executable.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ def prepare_for_execution
2828

2929
def finished
3030
touch(:finished_at)
31+
dispatch_blocked_jobs
3132
end
3233

3334
def finished?
@@ -36,6 +37,7 @@ def finished?
3637

3738
def failed_with(exception)
3839
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
40+
dispatch_blocked_jobs
3941
end
4042

4143
def discard
@@ -58,6 +60,12 @@ def dispatch
5860
end
5961
end
6062

63+
def dispatch_blocked_jobs
64+
if release_concurrency_lock
65+
release_next_blocked_job
66+
end
67+
end
68+
6169
def schedule
6270
ScheduledExecution.create_or_find_by!(job_id: id)
6371
end

app/models/solid_queue/semaphore.rb

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
class SolidQueue::Semaphore < SolidQueue::Record
2-
def self.wait_for(identifier, limit)
3-
if semaphore = find_by(identifier: identifier)
4-
semaphore.value > 0 && attempt_to_update(identifier)
5-
else
6-
attempt_to_create(identifier, limit)
2+
class << self
3+
def wait_for(identifier, limit)
4+
if semaphore = find_by(identifier: identifier)
5+
semaphore.value > 0 && attempt_to_update(identifier)
6+
else
7+
attempt_to_create(identifier, limit)
8+
end
79
end
8-
end
910

10-
def self.attempt_to_create(identifier, limit)
11-
create!(identifier: identifier, value: limit - 1)
12-
true
13-
rescue ActiveRecord::RecordNotUnique
14-
attempt_to_update(identifier)
15-
end
11+
def attempt_to_create(identifier, limit)
12+
create!(identifier: identifier, value: limit - 1)
13+
true
14+
rescue ActiveRecord::RecordNotUnique
15+
attempt_to_update(identifier)
16+
end
1617

17-
def self.attempt_to_update(identifier)
18-
where(identifier: identifier).where("value > 0").update_all("value = COALESCE(value, 1) - 1") > 0
18+
def attempt_to_update(identifier)
19+
where(identifier: identifier).where("value > 0").update_all("value = COALESCE(value, 1) - 1") > 0
20+
end
21+
22+
def release(identifier)
23+
where(identifier: identifier).update_all("value = COALESCE(value, 0) + 1") > 0
24+
end
1925
end
2026
end

lib/active_job/concurrency_controls.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ module ConcurrencyControls
77
DEFAULT_CONCURRENCY_KEY = ->(*) { self.name }
88

99
included do
10-
class_attribute :concurrency_limit, default: 1
10+
class_attribute :concurrency_limit, default: 0 # No limit
1111
class_attribute :concurrency_key, default: DEFAULT_CONCURRENCY_KEY, instance_accessor: false
1212
end
1313

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
11
class UpdateResultJob < ApplicationJob
2-
def perform(job_result, name:, pause: 0.1)
3-
job_result.update!(status: "started_#{name}")
4-
sleep(pause)
5-
job_result.update!(status: "completed_#{name}")
2+
include ActiveJob::ConcurrencyControls
3+
4+
limit_concurrency limit: 1, key: ->(job_result, **) { job_result }
5+
6+
def perform(job_result, name:, pause: nil)
7+
job_result.status += "s#{name}"
8+
job_result.save!
9+
10+
sleep(pause) if pause
11+
12+
job_result.status += "c#{name}"
13+
job_result.save!
614
end
715
end
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
require "test_helper"
3+
4+
class ConcurrencyControlsTest < ActiveSupport::TestCase
5+
self.use_transactional_tests = false
6+
7+
setup do
8+
SolidQueue::Job.delete_all
9+
10+
@result = JobResult.create!(queue_name: "default", status: "seq: ")
11+
12+
default_worker = { queues: "default", polling_interval: 1, processes: 3 }
13+
@pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ] })
14+
15+
wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor
16+
end
17+
18+
teardown do
19+
terminate_process(@pid) if process_exists?(@pid)
20+
end
21+
22+
test "run several conflicting jobs and prevent overlapping" do
23+
("A".."F").each do |name|
24+
UpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
25+
end
26+
27+
("G".."K").each do |name|
28+
UpdateResultJob.perform_later(@result, name: name)
29+
end
30+
31+
wait_for_jobs_to_finish_for(4.seconds)
32+
assert_stored_sequence @result, ("A".."K").to_a
33+
end
34+
35+
private
36+
def assert_stored_sequence(result, sequence)
37+
expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join
38+
skip_active_record_query_cache do
39+
assert_equal expected, result.reload.status
40+
end
41+
end
42+
end

test/integration/jobs_lifecycle_test.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
class JobsLifecycleTest < ActiveSupport::TestCase
55
setup do
6-
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 1)
6+
@worker = SolidQueue::Worker.new(queues: "background", threads: 3, polling_interval: 0.5)
77
@scheduler = SolidQueue::Scheduler.new(batch_size: 10, polling_interval: 1)
88
end
99

@@ -37,14 +37,14 @@ class JobsLifecycleTest < ActiveSupport::TestCase
3737

3838
travel_to 2.days.from_now
3939

40-
wait_for_jobs_to_finish_for(5.seconds)
40+
wait_for_jobs_to_finish_for(2.seconds)
4141

4242
assert_equal 1, JobBuffer.size
4343
assert_equal "I'm scheduled", JobBuffer.last_value
4444

4545
travel_to 5.days.from_now
4646

47-
wait_for_jobs_to_finish_for(5.seconds)
47+
wait_for_jobs_to_finish_for(2.seconds)
4848

4949
assert_equal 2, JobBuffer.size
5050
assert_equal "I'm scheduled later", JobBuffer.last_value

test/integration/processes_lifecycle_test.rb

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
1313
end
1414

1515
teardown do
16-
terminate_supervisor if process_exists?(@pid)
16+
terminate_process(@pid) if process_exists?(@pid)
1717
end
1818

1919
test "enqueue jobs in multiple queues" do
@@ -26,7 +26,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
2626
6.times { |i| assert_completed_job_results("job_#{i}", :background) }
2727
6.times { |i| assert_completed_job_results("job_#{i}", :default) }
2828

29-
terminate_supervisor
29+
terminate_process(@pid)
3030
assert_clean_termination
3131
end
3232

@@ -150,7 +150,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
150150
assert_job_status(job, :failed)
151151
end
152152

153-
terminate_supervisor
153+
terminate_process(@pid)
154154
assert_clean_termination
155155
end
156156

@@ -175,17 +175,12 @@ class ProcessLifecycleTest < ActiveSupport::TestCase
175175
end
176176

177177
assert process_exists?(@pid)
178-
179-
terminate_supervisor
178+
terminate_process(@pid)
180179

181180
assert_clean_termination
182181
end
183182

184183
private
185-
def terminate_supervisor
186-
terminate_process(@pid)
187-
end
188-
189184
def terminate_registered_processes
190185
skip_active_record_query_cache do
191186
SolidQueue::Process.find_each do |process|

test/models/solid_queue/job_test.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
require "test_helper"
22

33
class SolidQueue::JobTest < ActiveSupport::TestCase
4-
class NonOverlappingJob < UpdateResultJob
4+
class NonOverlappingJob < ApplicationJob
55
include ActiveJob::ConcurrencyControls
66

77
limit_concurrency limit: 1, key: ->(job_result, **) { job_result }
8+
9+
def perform(job_result)
10+
end
11+
end
12+
13+
setup do
14+
@result = JobResult.create!(queue_name: "default")
815
end
916

1017
test "enqueue active job to be executed right away" do
@@ -48,10 +55,6 @@ class NonOverlappingJob < UpdateResultJob
4855
assert Time.now < execution.scheduled_at
4956
end
5057

51-
setup do
52-
@result = JobResult.create!(queue_name: "default")
53-
end
54-
5558
test "enqueue jobs with concurrency controls" do
5659
active_job = NonOverlappingJob.perform_later(@result, name: "A")
5760
assert_equal 1, active_job.concurrency_limit

0 commit comments

Comments
 (0)