Skip to content

Commit 5f5578d

Browse files
committed
Test concurrency limited to more than 1 job of a kind
1 parent b80c1a5 commit 5f5578d

File tree

5 files changed

+57
-9
lines changed

5 files changed

+57
-9
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
class SequentialUpdateResultJob < UpdateResultJob
2+
include ActiveJob::ConcurrencyControls
3+
4+
limit_concurrency limit: 1, key: ->(job_result, **) { job_result }
5+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
class ThrottledUpdateResultJob < UpdateResultJob
2+
include ActiveJob::ConcurrencyControls
3+
4+
limit_concurrency limit: 3, key: ->(job_result, **) { job_result }
5+
end

test/dummy/app/jobs/update_result_job.rb

-2
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
class UpdateResultJob < ApplicationJob
22
include ActiveJob::ConcurrencyControls
33

4-
limit_concurrency limit: 1, key: ->(job_result, **) { job_result }
5-
64
def perform(job_result, name:, pause: nil)
75
job_result.status += "s#{name}"
86
job_result.save!

test/integration/concurrency_controls_test.rb

+36-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
99

1010
@result = JobResult.create!(queue_name: "default", status: "seq: ")
1111

12-
default_worker = { queues: "default", polling_interval: 1, processes: 3 }
12+
default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 }
1313
@pid = run_supervisor_as_fork(load_configuration_from: { workers: [ default_worker ] })
1414

1515
wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor
@@ -19,19 +19,51 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
1919
terminate_process(@pid) if process_exists?(@pid)
2020
end
2121

22-
test "run several conflicting jobs and prevent overlapping" do
22+
test "run several conflicting jobs over the same record sequentially" do
2323
("A".."F").each do |name|
24-
UpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
24+
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
2525
end
2626

2727
("G".."K").each do |name|
28-
UpdateResultJob.perform_later(@result, name: name)
28+
SequentialUpdateResultJob.perform_later(@result, name: name)
2929
end
3030

3131
wait_for_jobs_to_finish_for(4.seconds)
32+
assert_no_pending_jobs
33+
3234
assert_stored_sequence @result, ("A".."K").to_a
3335
end
3436

37+
test "run several jobs over the same record limiting concurrency" do
38+
incr = 0
39+
# C is the last one to update the record
40+
# A: 0 to 0.5
41+
# B: 0 to 1.0
42+
# C: 0 to 1.5
43+
assert_no_difference -> { SolidQueue::BlockedExecution.count } do
44+
("A".."C").each do |name|
45+
ThrottledUpdateResultJob.perform_later(@result, name: name, pause: (0.5 + incr).seconds)
46+
incr += 0.5
47+
end
48+
end
49+
50+
sleep(0.01) # To ensure these aren't picked up before ABC
51+
# D to H: 0.51 to 0.76 (starting after A finishes, and in order, 5 * 0.05 = 0.25)
52+
# These would finish all before B and C
53+
assert_difference -> { SolidQueue::BlockedExecution.count }, +5 do
54+
("D".."H").each do |name|
55+
ThrottledUpdateResultJob.perform_later(@result, name: name, pause: 0.05.seconds)
56+
end
57+
end
58+
59+
wait_for_jobs_to_finish_for(3.seconds)
60+
assert_no_pending_jobs
61+
62+
# C would have started in the beginning, seeing the status empty, and would finish after
63+
# all other jobs, so it'll do the last update with only itself
64+
assert_stored_sequence(@result, [ "C" ])
65+
end
66+
3567
private
3668
def assert_stored_sequence(result, sequence)
3769
expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join

test/test_helper.rb

+11-3
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,22 @@ class ActiveSupport::TestCase
3737

3838
private
3939
def wait_for_jobs_to_finish_for(timeout = 1.second)
40-
Timeout.timeout(timeout) do
41-
while SolidQueue::Job.where(finished_at: nil).any? do
42-
sleep 0.05
40+
skip_active_record_query_cache do
41+
Timeout.timeout(timeout) do
42+
while SolidQueue::Job.where(finished_at: nil).any? do
43+
sleep 0.05
44+
end
4345
end
4446
end
4547
rescue Timeout::Error
4648
end
4749

50+
def assert_no_pending_jobs
51+
skip_active_record_query_cache do
52+
assert SolidQueue::Job.where(finished_at: nil).none?
53+
end
54+
end
55+
4856
def run_supervisor_as_fork(**options)
4957
fork do
5058
SolidQueue::Supervisor.start(**options)

0 commit comments

Comments
 (0)