Skip to content

Commit 01bc12e

Browse files
committed
Configure jobs to be enqueued on different shards
1 parent e470071 commit 01bc12e

File tree

6 files changed

+233
-4
lines changed

6 files changed

+233
-4
lines changed

lib/active_job/queue_adapters/solid_queue_adapter.rb

+19-3
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,36 @@ module QueueAdapters
88
#
99
# Rails.application.config.active_job.queue_adapter = :solid_queue
1010
class SolidQueueAdapter
11+
def initialize(db_shard: nil)
12+
@db_shard = db_shard
13+
end
14+
1115
def enqueue_after_transaction_commit?
1216
true
1317
end
1418

1519
def enqueue(active_job) # :nodoc:
16-
SolidQueue::Job.enqueue(active_job)
20+
select_shard { SolidQueue::Job.enqueue(active_job) }
1721
end
1822

1923
def enqueue_at(active_job, timestamp) # :nodoc:
20-
SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp))
24+
select_shard do
25+
SolidQueue::Job.enqueue(active_job, scheduled_at: Time.at(timestamp))
26+
end
2127
end
2228

2329
def enqueue_all(active_jobs) # :nodoc:
24-
SolidQueue::Job.enqueue_all(active_jobs)
30+
select_shard { SolidQueue::Job.enqueue_all(active_jobs) }
31+
end
32+
33+
private
34+
35+
def select_shard
36+
if @db_shard
37+
ActiveRecord::Base.connected_to(shard: @db_shard) { yield }
38+
else
39+
yield
40+
end
2541
end
2642
end
2743
end

test/dummy/app/jobs/shard_two_job.rb

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
class ShardTwoJob < ApplicationJob
2+
self.queue_adapter = ActiveJob::QueueAdapters::SolidQueueAdapter.new(db_shard: :queue_shard_two)
3+
queue_as :background
4+
5+
def perform(arg)
6+
JobBuffer.add(arg)
7+
end
8+
end

test/dummy/config/database.yml

+8
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ development:
4848
<<: *default
4949
database: <%= database_name_from("development_queue") %>
5050
migrations_paths: db/queue_migrate
51+
queue_shard_two:
52+
<<: *default
53+
database: <%= database_name_from("development_queue_shard_two") %>
54+
migrations_paths: db/queue_migrate
5155

5256
test:
5357
primary:
@@ -65,3 +69,7 @@ test:
6569
<<: *default
6670
database: <%= database_name_from("test_queue") %>
6771
migrations_paths: db/queue_migrate
72+
queue_shard_two:
73+
<<: *default
74+
database: <%= database_name_from("test_queue_shard_two") %>
75+
migrations_paths: db/queue_migrate

test/dummy/config/environments/test.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,12 @@
4949

5050
# Replace the default in-process and non-durable queuing backend for Active Job.
5151
config.active_job.queue_adapter = :solid_queue
52-
config.solid_queue.connects_to = { database: { writing: :queue } }
52+
config.solid_queue.connects_to = {
53+
shards: {
54+
queue: { writing: :queue },
55+
queue_shard_two: { writing: :queue_shard_two }
56+
}
57+
}
5358

5459
# Annotate rendered view with file names.
5560
# config.action_view.annotate_rendered_view_with_filenames = true
+141
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# This file is auto-generated from the current state of the database. Instead
2+
# of editing this file, please use the migrations feature of Active Record to
3+
# incrementally modify your database, and then regenerate this schema definition.
4+
#
5+
# This file is the source Rails uses to define your schema when running `bin/rails
6+
# db:schema:load`. When creating a new database, `bin/rails db:schema:load` tends to
7+
# be faster and is potentially less error prone than running all of your
8+
# migrations from scratch. Old migrations may fail to apply correctly if those
9+
# migrations use external dependencies or application code.
10+
#
11+
# It's strongly recommended that you check this file into your version control system.
12+
13+
ActiveRecord::Schema[7.1].define(version: 1) do
14+
create_table "solid_queue_blocked_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
15+
t.bigint "job_id", null: false
16+
t.string "queue_name", null: false
17+
t.integer "priority", default: 0, null: false
18+
t.string "concurrency_key", null: false
19+
t.datetime "expires_at", null: false
20+
t.datetime "created_at", null: false
21+
t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release"
22+
t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance"
23+
t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true
24+
end
25+
26+
create_table "solid_queue_claimed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
27+
t.bigint "job_id", null: false
28+
t.bigint "process_id"
29+
t.datetime "created_at", null: false
30+
t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
31+
t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
32+
end
33+
34+
create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
35+
t.bigint "job_id", null: false
36+
t.text "error"
37+
t.datetime "created_at", null: false
38+
t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true
39+
end
40+
41+
create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
42+
t.string "queue_name", null: false
43+
t.string "class_name", null: false
44+
t.text "arguments"
45+
t.integer "priority", default: 0, null: false
46+
t.string "active_job_id"
47+
t.datetime "scheduled_at"
48+
t.datetime "finished_at"
49+
t.string "concurrency_key"
50+
t.datetime "created_at", null: false
51+
t.datetime "updated_at", null: false
52+
t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id"
53+
t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name"
54+
t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at"
55+
t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering"
56+
t.index ["scheduled_at", "finished_at"], name: "index_solid_queue_jobs_for_alerting"
57+
end
58+
59+
create_table "solid_queue_pauses", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
60+
t.string "queue_name", null: false
61+
t.datetime "created_at", null: false
62+
t.index ["queue_name"], name: "index_solid_queue_pauses_on_queue_name", unique: true
63+
end
64+
65+
create_table "solid_queue_processes", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
66+
t.string "kind", null: false
67+
t.datetime "last_heartbeat_at", null: false
68+
t.bigint "supervisor_id"
69+
t.integer "pid", null: false
70+
t.string "hostname"
71+
t.text "metadata"
72+
t.datetime "created_at", null: false
73+
t.string "name", null: false
74+
t.index ["last_heartbeat_at"], name: "index_solid_queue_processes_on_last_heartbeat_at"
75+
t.index ["name", "supervisor_id"], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
76+
t.index ["supervisor_id"], name: "index_solid_queue_processes_on_supervisor_id"
77+
end
78+
79+
create_table "solid_queue_ready_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
80+
t.bigint "job_id", null: false
81+
t.string "queue_name", null: false
82+
t.integer "priority", default: 0, null: false
83+
t.datetime "created_at", null: false
84+
t.index ["job_id"], name: "index_solid_queue_ready_executions_on_job_id", unique: true
85+
t.index ["priority", "job_id"], name: "index_solid_queue_poll_all"
86+
t.index ["queue_name", "priority", "job_id"], name: "index_solid_queue_poll_by_queue"
87+
end
88+
89+
create_table "solid_queue_recurring_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
90+
t.bigint "job_id", null: false
91+
t.string "task_key", null: false
92+
t.datetime "run_at", null: false
93+
t.datetime "created_at", null: false
94+
t.index ["job_id"], name: "index_solid_queue_recurring_executions_on_job_id", unique: true
95+
t.index ["task_key", "run_at"], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
96+
end
97+
98+
create_table "solid_queue_recurring_tasks", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
99+
t.string "key", null: false
100+
t.string "schedule", null: false
101+
t.string "command", limit: 2048
102+
t.string "class_name"
103+
t.text "arguments"
104+
t.string "queue_name"
105+
t.integer "priority", default: 0
106+
t.boolean "static", default: true, null: false
107+
t.text "description"
108+
t.datetime "created_at", null: false
109+
t.datetime "updated_at", null: false
110+
t.index ["key"], name: "index_solid_queue_recurring_tasks_on_key", unique: true
111+
t.index ["static"], name: "index_solid_queue_recurring_tasks_on_static"
112+
end
113+
114+
create_table "solid_queue_scheduled_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
115+
t.bigint "job_id", null: false
116+
t.string "queue_name", null: false
117+
t.integer "priority", default: 0, null: false
118+
t.datetime "scheduled_at", null: false
119+
t.datetime "created_at", null: false
120+
t.index ["job_id"], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true
121+
t.index ["scheduled_at", "priority", "job_id"], name: "index_solid_queue_dispatch_all"
122+
end
123+
124+
create_table "solid_queue_semaphores", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
125+
t.string "key", null: false
126+
t.integer "value", default: 1, null: false
127+
t.datetime "expires_at", null: false
128+
t.datetime "created_at", null: false
129+
t.datetime "updated_at", null: false
130+
t.index ["expires_at"], name: "index_solid_queue_semaphores_on_expires_at"
131+
t.index ["key", "value"], name: "index_solid_queue_semaphores_on_key_and_value"
132+
t.index ["key"], name: "index_solid_queue_semaphores_on_key", unique: true
133+
end
134+
135+
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
136+
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
137+
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
138+
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
139+
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
140+
add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
141+
end

test/unit/multisharding_test.rb

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
require "test_helper"
2+
3+
class MultishardingTest < ActiveSupport::TestCase
4+
test "jobs are enqueued in the right shard" do
5+
assert_difference -> { SolidQueue::Job.count }, 1 do
6+
assert_difference -> do
7+
ActiveRecord::Base.connected_to(
8+
shard: :queue_shard_two
9+
) { SolidQueue::Job.count }
10+
end,
11+
1 do
12+
AddToBufferJob.perform_later "hey!"
13+
ShardTwoJob.perform_later "coucou!"
14+
end
15+
end
16+
end
17+
18+
test "jobs are enqueued for later in the right shard" do
19+
assert_difference -> { SolidQueue::ScheduledExecution.count }, 1 do
20+
assert_difference -> do
21+
ActiveRecord::Base.connected_to(
22+
shard: :queue_shard_two
23+
) { SolidQueue::ScheduledExecution.count }
24+
end,
25+
1 do
26+
AddToBufferJob.set(wait: 1).perform_later "hey!"
27+
ShardTwoJob.set(wait: 1).perform_later "coucou!"
28+
end
29+
end
30+
end
31+
32+
test "jobs are enqueued in bulk in the right shard" do
33+
active_jobs = [
34+
AddToBufferJob.new(2),
35+
ShardTwoJob.new(6),
36+
AddToBufferJob.new(3),
37+
ShardTwoJob.new(7)
38+
]
39+
40+
assert_difference -> { SolidQueue::Job.count }, 2 do
41+
assert_difference -> do
42+
ActiveRecord::Base.connected_to(
43+
shard: :queue_shard_two
44+
) { SolidQueue::Job.count }
45+
end,
46+
2 do
47+
ActiveJob.perform_all_later(active_jobs)
48+
end
49+
end
50+
end
51+
end

0 commit comments

Comments
 (0)