Skip to content

Commit 8f6850c

Browse files
committed
Enable shard selection for processes
1 parent 01bc12e commit 8f6850c

File tree

5 files changed

+48
-2
lines changed

5 files changed

+48
-2
lines changed

lib/active_job/queue_adapters/solid_queue_adapter.rb

+4-2
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ def enqueue_all(active_jobs) # :nodoc:
3333
private
3434

3535
def select_shard
36-
if @db_shard
37-
ActiveRecord::Base.connected_to(shard: @db_shard) { yield }
36+
shard = @db_shard || SolidQueue.primary_shard
37+
38+
if shard
39+
ActiveRecord::Base.connected_to(shard: shard) { yield }
3840
else
3941
yield
4042
end

lib/solid_queue.rb

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ module SolidQueue
4141
mattr_accessor :clear_finished_jobs_after, default: 1.day
4242
mattr_accessor :default_concurrency_control_period, default: 3.minutes
4343

44+
mattr_accessor :primary_shard, :active_shard
45+
4446
delegate :on_start, :on_stop, to: Supervisor
4547

4648
def on_worker_start(...)

lib/solid_queue/processes/base.rb

+17
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,23 @@ class Base
66
include Callbacks # Defines callbacks needed by other concerns
77
include AppExecutor, Registrable, Interruptible, Procline
88

9+
after_boot -> do
10+
if SolidQueue.connects_to.key?(:shards)
11+
# Record the name of the primary shard, which should be used for
12+
# adapter less jobs
13+
if SolidQueue.primary_shard.nil?
14+
SolidQueue.primary_shard = SolidQueue.connects_to[:shards].keys.first
15+
end
16+
17+
# Move active_shard to first position in connects_to[:shards] Hash to
18+
# make it the default
19+
if SolidQueue.active_shard.present? &&
20+
SolidQueue.connects_to[:shards].key?(SolidQueue.active_shard)
21+
SolidQueue::Record.default_shard = SolidQueue.active_shard
22+
end
23+
end
24+
end
25+
926
attr_reader :name
1027

1128
def initialize(*)

test/integration/jobs_lifecycle_test.rb

+16
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,22 @@ class JobsLifecycleTest < ActiveSupport::TestCase
3030
assert_equal 2, SolidQueue::Job.finished.count
3131
end
3232

33+
test "enqueue and run jobs from different shards" do
34+
AddToBufferJob.perform_later "hey"
35+
ShardTwoJob.perform_later "ho"
36+
37+
change_active_shard_to(:queue_shard_two) do
38+
@dispatcher.start
39+
@worker.start
40+
41+
wait_for_jobs_to_finish_for(2.seconds)
42+
end
43+
44+
assert_equal [ "ho" ], JobBuffer.values.sort
45+
assert_equal 1, SolidQueue::ReadyExecution.count
46+
assert_equal 1, ActiveRecord::Base.connected_to(shard: :queue_shard_two) { SolidQueue::Job.finished.count }
47+
end
48+
3349
test "enqueue and run jobs that fail without retries" do
3450
RaisingJob.perform_later(ExpectedTestError, "A")
3551
RaisingJob.perform_later(ExpectedTestError, "B")

test/test_helpers/jobs_test_helper.rb

+9
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,13 @@ def assert_claimed_jobs(count = 1)
3434
assert_equal count, SolidQueue::ClaimedExecution.count
3535
end
3636
end
37+
38+
def change_active_shard_to(new_shard_name, &block)
39+
old_shard_name = SolidQueue.active_shard
40+
SolidQueue.active_shard = new_shard_name
41+
block.call
42+
ensure
43+
SolidQueue.active_shard = old_shard_name
44+
SolidQueue::Record.connects_to(**SolidQueue.connects_to) if SolidQueue.connects_to
45+
end
3746
end

0 commit comments

Comments
 (0)