Skip to content

Commit a386512

Browse files
committed
Rename Semaphore.concurrency_key to Semaphore.key and simplify method sigantures
We can simply pass a job, since both Active Job and Solid Queue's Job respond to the same methods, required for the semaphore to be waited on, or signaled. The "concurrency" part of the "concurrency_key" attribute is redundant.
1 parent 1961978 commit a386512

File tree

6 files changed

+24
-24
lines changed

6 files changed

+24
-24
lines changed

app/models/solid_queue/blocked_execution.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module SolidQueue
22
class BlockedExecution < SolidQueue::Execution
33
assume_attributes_from_job :concurrency_key
44

5-
has_one :semaphore, foreign_key: :concurrency_key, primary_key: :concurrency_key
5+
has_one :semaphore, foreign_key: :key, primary_key: :concurrency_key
66

77
scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) }
88
scope :ordered, -> { order(priority: :asc) }
@@ -36,7 +36,7 @@ def release
3636

3737
private
3838
def acquire_concurrency_lock
39-
Semaphore.wait_for(concurrency_key, job.concurrency_limit, job.concurrency_limit_duration)
39+
Semaphore.wait(job)
4040
end
4141

4242
def promote_to_ready

app/models/solid_queue/job/concurrency_controls.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ def unblock_blocked_jobs
1919
def acquire_concurrency_lock
2020
return true unless concurrency_limited?
2121

22-
Semaphore.wait_for(concurrency_key, concurrency_limit, concurrency_limit_duration)
22+
Semaphore.wait(self)
2323
end
2424

2525
def release_concurrency_lock
2626
return false unless concurrency_limited?
2727

28-
Semaphore.release(concurrency_key, concurrency_limit, concurrency_limit_duration)
28+
Semaphore.signal(self)
2929
end
3030

3131
def block

app/models/solid_queue/semaphore.rb

+13-13
Original file line numberDiff line numberDiff line change
@@ -4,32 +4,32 @@ class SolidQueue::Semaphore < SolidQueue::Record
44
scope :expired, -> { where(expires_at: ...Time.current)}
55

66
class << self
7-
def wait_for(concurrency_key, limit, duration)
8-
if semaphore = find_by(concurrency_key: concurrency_key)
9-
semaphore.value > 0 && attempt_decrement(concurrency_key, duration)
7+
def wait(job)
8+
if semaphore = find_by(key: job.concurrency_key)
9+
semaphore.value > 0 && attempt_decrement(job.concurrency_key, job.concurrency_limit_duration)
1010
else
11-
attempt_creation(concurrency_key, limit, duration)
11+
attempt_creation(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration)
1212
end
1313
end
1414

15-
def release(concurrency_key, limit, duration)
16-
attempt_increment(concurrency_key, limit, duration)
15+
def signal(job)
16+
attempt_increment(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration)
1717
end
1818

1919
private
20-
def attempt_creation(concurrency_key, limit, duration)
21-
create!(concurrency_key: concurrency_key, value: limit - 1, expires_at: duration.from_now)
20+
def attempt_creation(key, limit, duration)
21+
create!(key: key, value: limit - 1, expires_at: duration.from_now)
2222
true
2323
rescue ActiveRecord::RecordNotUnique
24-
attempt_decrement(concurrency_key, duration)
24+
attempt_decrement(key, duration)
2525
end
2626

27-
def attempt_decrement(concurrency_key, duration)
28-
available.where(concurrency_key: concurrency_key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0
27+
def attempt_decrement(key, duration)
28+
available.where(key: key).update_all([ "value = value - 1, expires_at = ?", duration.from_now ]) > 0
2929
end
3030

31-
def attempt_increment(concurrency_key, limit, duration)
32-
where("value < ?", limit).where(concurrency_key: concurrency_key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0
31+
def attempt_increment(key, limit, duration)
32+
where("value < ?", limit).where(key: key).update_all([ "value = value + 1, expires_at = ?", duration.from_now ]) > 0
3333
end
3434
end
3535
end

db/migrate/20231103204612_create_solid_queue_concurrency_controls.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def change
1717
end
1818

1919
create_table :solid_queue_semaphores do |t|
20-
t.string :concurrency_key, null: false, index: { unique: true }
20+
t.string :key, null: false, index: { unique: true }
2121
t.integer :value, null: false, default: 1
2222
t.datetime :expires_at, null: false, index: true
2323

test/dummy/db/schema.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,13 @@
9696
end
9797

9898
create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
99-
t.string "concurrency_key", null: false
99+
t.string "key", null: false
100100
t.integer "value", default: 1, null: false
101101
t.datetime "expires_at", null: false
102102
t.datetime "created_at", null: false
103103
t.datetime "updated_at", null: false
104-
t.index ["concurrency_key"], name: "index_solid_queue_semaphores_on_concurrency_key", unique: true
105104
t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at"
105+
t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true
106106
end
107107

108108
end

test/integration/concurrency_controls_test.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
9191

9292
# Lock the semaphore so we can enqueue jobs and leave them blocked
9393
skip_active_record_query_cache do
94-
assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration)
94+
assert SolidQueue::Semaphore.wait(job)
9595
end
9696

9797
# Now enqueue more jobs under that same key. They'll be all locked. Use priorities
@@ -104,7 +104,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
104104

105105
# Then unlock the semaphore: this would be as if the first job had released
106106
# the semaphore but hadn't unblocked any jobs
107-
assert SolidQueue::Semaphore.release(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration)
107+
assert SolidQueue::Semaphore.signal(job)
108108

109109
# And wait for workers to release the jobs
110110
wait_for_jobs_to_finish_for(2.seconds)
@@ -123,7 +123,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
123123

124124
# Lock the semaphore so we can enqueue jobs and leave them blocked
125125
skip_active_record_query_cache do
126-
assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit, job.concurrency_limit_duration)
126+
assert SolidQueue::Semaphore.wait(job)
127127
end
128128

129129
# Now enqueue more jobs under that same key. They'll be all locked
@@ -134,7 +134,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
134134
end
135135

136136
# Simulate semaphore expiration
137-
SolidQueue::Semaphore.find_by(concurrency_key: job.concurrency_key).update(expires_at: 1.hour.ago)
137+
SolidQueue::Semaphore.find_by(key: job.concurrency_key).update(expires_at: 1.hour.ago)
138138

139139
# And wait for scheduler to release the jobs
140140
wait_for_jobs_to_finish_for(2.seconds)

0 commit comments

Comments
 (0)