Skip to content

Commit 1579e75

Browse files
authored
Merge pull request #38 from basecamp/concurrency-controls
Introduce concurrency controls
2 parents 6367062 + 8a5de86 commit 1579e75

31 files changed

+700
-122
lines changed

Diff for: app/models/solid_queue/blocked_execution.rb

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
module SolidQueue
2+
class BlockedExecution < SolidQueue::Execution
3+
assume_attributes_from_job :concurrency_key
4+
before_create :set_expires_at
5+
6+
has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key
7+
8+
scope :expired, -> { where(expires_at: ...Time.current) }
9+
10+
class << self
11+
def unblock(count)
12+
expired.distinct.limit(count).pluck(:concurrency_key).then do |concurrency_keys|
13+
release_many releasable(concurrency_keys)
14+
end
15+
end
16+
17+
def release_many(concurrency_keys)
18+
# We want to release exactly one blocked execution for each concurrency key, and we need to do it
19+
# one by one, locking each record and acquiring the semaphore individually for each of them:
20+
Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) }
21+
end
22+
23+
def release_one(concurrency_key)
24+
ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release)
25+
end
26+
27+
private
28+
def releasable(concurrency_keys)
29+
semaphores = Semaphore.where(key: concurrency_keys).select(:key, :value).index_by(&:key)
30+
31+
# Concurrency keys without semaphore + concurrency keys with open semaphore
32+
(concurrency_keys - semaphores.keys) | semaphores.select { |key, semaphore| semaphore.value > 0 }.map(&:first)
33+
end
34+
end
35+
36+
def release
37+
transaction do
38+
if acquire_concurrency_lock
39+
promote_to_ready
40+
destroy!
41+
42+
SolidQueue.logger.info("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
43+
end
44+
end
45+
end
46+
47+
private
48+
def set_expires_at
49+
self.expires_at = job.concurrency_duration.from_now
50+
end
51+
52+
def acquire_concurrency_lock
53+
Semaphore.wait(job)
54+
end
55+
56+
def promote_to_ready
57+
ReadyExecution.create!(ready_attributes)
58+
end
59+
end
60+
end

Diff for: app/models/solid_queue/claimed_execution.rb

+2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def perform
3131
else
3232
failed_with(result.error)
3333
end
34+
ensure
35+
job.unblock_next_blocked_job
3436
end
3537

3638
def release

Diff for: app/models/solid_queue/execution.rb

+13-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1-
class SolidQueue::Execution < SolidQueue::Record
2-
include JobAttributes
1+
module SolidQueue
2+
class Execution < SolidQueue::Record
3+
include JobAttributes
34

4-
self.abstract_class = true
5+
self.abstract_class = true
56

6-
belongs_to :job
7+
scope :ordered, -> { order(priority: :asc, job_id: :asc) }
78

8-
alias_method :discard, :destroy
9+
belongs_to :job
10+
11+
alias_method :discard, :destroy
12+
13+
def ready_attributes
14+
attributes.slice("job_id", "queue_name", "priority")
15+
end
16+
end
917
end

Diff for: app/models/solid_queue/job.rb

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ def enqueue_active_job(active_job, scheduled_at: Time.current)
1818
priority: active_job.priority,
1919
scheduled_at: scheduled_at,
2020
class_name: active_job.class.name,
21-
arguments: active_job.serialize
21+
arguments: active_job.serialize,
22+
concurrency_key: active_job.try(:concurrency_key)
2223
end
2324

2425
def enqueue(**kwargs)

Diff for: app/models/solid_queue/job/concurrency_controls.rb

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
module SolidQueue
2+
class Job
3+
module ConcurrencyControls
4+
extend ActiveSupport::Concern
5+
6+
included do
7+
has_one :blocked_execution, dependent: :destroy
8+
9+
delegate :concurrency_limit, :concurrency_duration, to: :job_class
10+
end
11+
12+
def unblock_next_blocked_job
13+
if release_concurrency_lock
14+
release_next_blocked_job
15+
end
16+
end
17+
18+
private
19+
def acquire_concurrency_lock
20+
return true unless concurrency_limited?
21+
22+
Semaphore.wait(self)
23+
end
24+
25+
def release_concurrency_lock
26+
return false unless concurrency_limited?
27+
28+
Semaphore.signal(self)
29+
end
30+
31+
def block
32+
BlockedExecution.create_or_find_by!(job_id: id)
33+
end
34+
35+
def release_next_blocked_job
36+
BlockedExecution.release_one(concurrency_key)
37+
end
38+
39+
def concurrency_limited?
40+
concurrency_key.present? && concurrency_limit.to_i > 0
41+
end
42+
43+
def job_class
44+
@job_class ||= class_name.safe_constantize
45+
end
46+
end
47+
end
48+
end

Diff for: app/models/solid_queue/job/executable.rb

+65-44
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,85 @@
11
module SolidQueue
2-
module Job::Executable
3-
extend ActiveSupport::Concern
2+
class Job
3+
module Executable
4+
extend ActiveSupport::Concern
45

5-
included do
6-
has_one :ready_execution, dependent: :destroy
7-
has_one :claimed_execution, dependent: :destroy
8-
has_one :failed_execution, dependent: :destroy
6+
included do
7+
include ConcurrencyControls
98

10-
has_one :scheduled_execution, dependent: :destroy
9+
has_one :ready_execution, dependent: :destroy
10+
has_one :claimed_execution, dependent: :destroy
11+
has_one :failed_execution, dependent: :destroy
1112

12-
after_create :prepare_for_execution
13+
has_one :scheduled_execution, dependent: :destroy
1314

14-
scope :finished, -> { where.not(finished_at: nil) }
15-
end
16-
17-
STATUSES = %w[ ready claimed failed scheduled ]
15+
after_create :prepare_for_execution
1816

19-
STATUSES.each do |status|
20-
define_method("#{status}?") { public_send("#{status}_execution").present? }
21-
end
17+
scope :finished, -> { where.not(finished_at: nil) }
18+
end
2219

23-
def prepare_for_execution
24-
if due?
25-
ReadyExecution.create_or_find_by!(job_id: id)
26-
else
27-
ScheduledExecution.create_or_find_by!(job_id: id)
20+
%w[ ready claimed failed scheduled ].each do |status|
21+
define_method("#{status}?") { public_send("#{status}_execution").present? }
2822
end
29-
end
3023

31-
def finished!
32-
if delete_finished_jobs?
33-
destroy!
34-
else
35-
touch(:finished_at)
24+
def prepare_for_execution
25+
if due? then dispatch
26+
else
27+
schedule
28+
end
3629
end
37-
end
3830

39-
def finished?
40-
finished_at.present?
41-
end
31+
def finished!
32+
if delete_finished_jobs?
33+
destroy!
34+
else
35+
touch(:finished_at)
36+
end
37+
end
4238

43-
def failed_with(exception)
44-
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
45-
end
39+
def finished?
40+
finished_at.present?
41+
end
4642

47-
def discard
48-
destroy unless claimed?
49-
end
43+
def failed_with(exception)
44+
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
45+
end
5046

51-
def retry
52-
failed_execution&.retry
53-
end
47+
def discard
48+
destroy unless claimed?
49+
end
5450

55-
private
56-
def due?
57-
scheduled_at.nil? || scheduled_at <= Time.current
51+
def retry
52+
failed_execution&.retry
5853
end
5954

60-
def delete_finished_jobs?
61-
SolidQueue.delete_finished_jobs
55+
def failed_with(exception)
56+
FailedExecution.create_or_find_by!(job_id: id, exception: exception)
6257
end
58+
59+
private
60+
def due?
61+
scheduled_at.nil? || scheduled_at <= Time.current
62+
end
63+
64+
def dispatch
65+
if acquire_concurrency_lock then ready
66+
else
67+
block
68+
end
69+
end
70+
71+
def schedule
72+
ScheduledExecution.create_or_find_by!(job_id: id)
73+
end
74+
75+
def ready
76+
ReadyExecution.create_or_find_by!(job_id: id)
77+
end
78+
79+
80+
def delete_finished_jobs?
81+
SolidQueue.delete_finished_jobs
82+
end
83+
end
6384
end
6485
end

Diff for: app/models/solid_queue/ready_execution.rb

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
module SolidQueue
22
class ReadyExecution < Execution
33
scope :queued_as, ->(queue_name) { where(queue_name: queue_name) }
4-
scope :ordered, -> { order(priority: :asc, job_id: :asc) }
54

65
assume_attributes_from_job
76

Diff for: app/models/solid_queue/scheduled_execution.rb

+2-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
class SolidQueue::ScheduledExecution < SolidQueue::Execution
2-
scope :due, -> { where("scheduled_at <= ?", Time.current) }
2+
scope :due, -> { where(scheduled_at: ..Time.current) }
33
scope :ordered, -> { order(scheduled_at: :asc, priority: :asc) }
44
scope :next_batch, ->(batch_size) { due.ordered.limit(batch_size) }
55

@@ -10,7 +10,7 @@ def prepare_batch(batch)
1010
prepared_at = Time.current
1111

1212
rows = batch.map do |scheduled_execution|
13-
scheduled_execution.execution_ready_attributes.merge(created_at: prepared_at)
13+
scheduled_execution.ready_attributes.merge(created_at: prepared_at)
1414
end
1515

1616
if rows.any?
@@ -23,8 +23,4 @@ def prepare_batch(batch)
2323
SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}")
2424
end
2525
end
26-
27-
def execution_ready_attributes
28-
attributes.slice("job_id", "queue_name", "priority")
29-
end
3026
end

Diff for: app/models/solid_queue/semaphore.rb

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
class SolidQueue::Semaphore < SolidQueue::Record
2+
scope :available, -> { where("value > 0") }
3+
scope :expired, -> { where(expires_at: ...Time.current) }
4+
5+
class << self
6+
def wait(job)
7+
Proxy.new(job, self).wait
8+
end
9+
10+
def signal(job)
11+
Proxy.new(job, self).signal
12+
end
13+
end
14+
15+
class Proxy
16+
def initialize(job, proxied_class)
17+
@job = job
18+
@proxied_class = proxied_class
19+
end
20+
21+
def wait
22+
if semaphore = proxied_class.find_by(key: key)
23+
semaphore.value > 0 && attempt_decrement
24+
else
25+
attempt_creation
26+
end
27+
end
28+
29+
def signal
30+
attempt_increment
31+
end
32+
33+
private
34+
attr_reader :job, :proxied_class
35+
36+
def attempt_creation
37+
proxied_class.create!(key: key, value: limit - 1, expires_at: expires_at)
38+
true
39+
rescue ActiveRecord::RecordNotUnique
40+
attempt_decrement
41+
end
42+
43+
def attempt_decrement
44+
proxied_class.available.where(key: key).update_all([ "value = value - 1, expires_at = ?", expires_at ]) > 0
45+
end
46+
47+
def attempt_increment
48+
proxied_class.where(key: key, value: ...limit).update_all([ "value = value + 1, expires_at = ?", expires_at ]) > 0
49+
end
50+
51+
def key
52+
job.concurrency_key
53+
end
54+
55+
def expires_at
56+
job.concurrency_duration.from_now
57+
end
58+
59+
def limit
60+
job.concurrency_limit
61+
end
62+
end
63+
end

0 commit comments

Comments
 (0)