Skip to content

Commit 77b67b0

Browse files
committed
Rely on blocked_executions.expires_at to select candidates to be released
Sempahores' expiry times are bumped when new jobs are successfully queued for execution or when jobs finish and release the semaphores. This means that if a blocked execution's expiry time is in the past, the semaphore's expiry time is most likely in the past too. Here's why: we know that if we still have a blocked job execution after its expiry time has passed, it's because: 1. A job holding the semaphore hasn't finished yet, and in that case, the semaphore's expiry time would have expired as well and would be cleared up right before checking blocked jobs. 2. The job holding the semaphore finished and released the semaphore but failed to unblock the next job. In that case, when we inspect the blocked job's concurrency key, we'll see the semaphore released. a. It's possible a new job is enqueued in the meantime and claims the semaphore, so we wouldn't be able to unblock that blocked job. However, if this happens, it's also more likely that this new job will succeed at unblocking it when it is finished. The more jobs that are enqueued and run, bumping the semaphore's expiry time, the more likely we are to unblock the blocked jobs via the normal method. 3. The job holding the semaphore finished but failed to release the semaphore: this case is the same as 1, the semaphore will be cleared before unblocking the execution. We take advantage of this to select X (scheduler's batch size) distinct concurrency keys from expired blocked executions, and for that we can use the index on (expires_at, concurrency_key), that filters the elements, even if we have to scan all of them to find the distcint concurrency keys using a temporary table that would get at most X items long. Then, we'll check whether these (up to) X concurrency keys are releasable and try to release them. A potential problem would be if we happen to select X concurrency keys that are expired but turn out not to be releasable. I think this should be very unlikely because for this to happen, we'd have to failed to unblock X jobs via the regular method and that other jobs using the same concurrency keys were enqueued, claiming the semaphore (case 2.a. described above), before we had the chance to unblock them, for all of them. We'd need two exceptional things to happen at once: a large backlog of concurrent jobs (using different keys) and a large amount of failed unblocked jobs. Thanks to @djmb for thinking through this with me and all the help!
1 parent 49f7d13 commit 77b67b0

File tree

5 files changed

+18
-11
lines changed

5 files changed

+18
-11
lines changed

app/models/solid_queue/blocked_execution.rb

+12-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ class BlockedExecution < SolidQueue::Execution
55

66
has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key
77

8-
scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) }
8+
scope :expired, -> { where(expires_at: ...Time.current) }
99

1010
class << self
1111
def unblock(count)
12-
release_many releasable.distinct.limit(count).pluck(:concurrency_key)
12+
expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys|
13+
release_many releasable(concurrency_keys)
14+
end
1315
end
1416

1517
def release_many(concurrency_keys)
@@ -21,6 +23,14 @@ def release_many(concurrency_keys)
2123
def release_one(concurrency_key)
2224
ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release)
2325
end
26+
27+
private
28+
def releasable(concurrency_keys)
29+
semaphores = Semaphore.where(key: concurrency_keys).pluck(:key, :value).index_by(&:key)
30+
31+
# Concurrency keys without semaphore + concurrency keys with open semaphore
32+
(concurrency_keys - semaphores.keys) | semaphores.select { |key, value| value > 0 }.map(&:first)
33+
end
2434
end
2535

2636
def release

app/models/solid_queue/semaphore.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
class SolidQueue::Semaphore < SolidQueue::Record
22
scope :available, -> { where("value > 0") }
3-
scope :expired, -> { where(expires_at: ...Time.current)}
3+
scope :expired, -> { where(expires_at: ...Time.current) }
44

55
class << self
66
def wait(job)

lib/solid_queue.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ module SolidQueue
3434
mattr_accessor :supervisor, default: false
3535

3636
mattr_accessor :delete_finished_jobs, default: true
37-
mattr_accessor :default_concurrency_control_period, default: 15.minutes
37+
mattr_accessor :default_concurrency_control_period, default: 3.minutes
3838

3939
def self.supervisor?
4040
supervisor
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
class SequentialUpdateResultJob < UpdateResultJob
2-
limits_concurrency key: ->(job_result, **) { job_result }
2+
limits_concurrency key: ->(job_result, **) { job_result }, duration: 2.seconds
33
end

test/integration/concurrency_controls_test.rb

+3-6
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
106106
# the semaphore but hadn't unblocked any jobs
107107
assert SolidQueue::Semaphore.signal(job)
108108

109-
# And wait for workers to release the jobs
110-
wait_for_jobs_to_finish_for(3.seconds)
109+
# And wait for the scheduler to release the jobs
110+
wait_for_jobs_to_finish_for(5.seconds)
111111
assert_no_pending_jobs
112112

113113
# We can't ensure the order between B and C, because it depends on which worker wins when
@@ -133,11 +133,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
133133
end
134134
end
135135

136-
# Simulate semaphore expiration
137-
SolidQueue::Semaphore.find_by(key: job.concurrency_key).update(expires_at: 1.hour.ago)
138-
139136
# And wait for scheduler to release the jobs
140-
wait_for_jobs_to_finish_for(3.seconds)
137+
wait_for_jobs_to_finish_for(5.seconds)
141138
assert_no_pending_jobs
142139

143140
# We can't ensure the order between B and C, because it depends on which worker wins when

0 commit comments

Comments
 (0)