Skip to content

Commit bca41d6

Browse files
committed
Batch job POC
* Introduces a "batch" concept, similar to batches present in Sidekiq Pro and GoodJob * Batches monitor a set of jobs, and when those jobs are completed can fire off a final job * This introduces a SolidQueue::JobBatch model, as well as the ability to enqueue jobs and associate them with the batch * There are still more ideas to figure out, but this provides a basic batch scaffolding to spark discussion
1 parent bf6bcf2 commit bca41d6

File tree

14 files changed

+246
-2
lines changed

14 files changed

+246
-2
lines changed

Diff for: README.md

+12
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,18 @@ failed_execution.discard # This will delete the job from the system
230230

231231
We're planning to release a dashboard called _Mission Control_, where, among other things, you'll be able to examine and retry/discard failed jobs, one by one, or in bulk.
232232

233+
## Batch jobs
234+
235+
```rb
236+
SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do
237+
5.times.map { |i| SleepyJob.perform_later(i) }
238+
end
239+
240+
SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) do
241+
5.times.map { |i| SleepyJob.perform_later(i) }
242+
end
243+
```
244+
233245

234246
## Puma plugin
235247
We provide a Puma plugin if you want to run the Solid Queue's supervisor together with Puma and have Puma monitor and manage it. You just need to add

Diff for: app/models/solid_queue/claimed_execution.rb

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ def perform
4141
else
4242
failed_with(result.error)
4343
end
44+
45+
job.job_batch.touch(:changed_at, :last_changed_at) if job.batch_id.present?
4446
ensure
4547
job.unblock_next_blocked_job
4648
end

Diff for: app/models/solid_queue/job.rb

+5-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ class Job < Record
66

77
serialize :arguments, coder: JSON
88

9+
belongs_to :job_batch, foreign_key: :batch_id, optional: true
10+
911
class << self
1012
def enqueue_all(active_jobs)
1113
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
@@ -46,14 +48,16 @@ def create_all_from_active_jobs(active_jobs)
4648
end
4749

4850
def attributes_from_active_job(active_job)
51+
active_job.batch_id = JobBatch.current_batch_id || active_job.batch_id
4952
{
5053
queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME,
5154
active_job_id: active_job.job_id,
5255
priority: active_job.priority || DEFAULT_PRIORITY,
5356
scheduled_at: active_job.scheduled_at,
5457
class_name: active_job.class.name,
5558
arguments: active_job.serialize,
56-
concurrency_key: active_job.concurrency_key
59+
concurrency_key: active_job.concurrency_key,
60+
batch_id: active_job.batch_id
5761
}
5862
end
5963
end

Diff for: app/models/solid_queue/job/executable.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def dispatch_bypassing_concurrency_limits
7878
end
7979

8080
def finished!
81-
if preserve_finished_jobs?
81+
if preserve_finished_jobs? || batch_id.present?
8282
touch(:finished_at)
8383
else
8484
destroy!

Diff for: app/models/solid_queue/job_batch.rb

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class JobBatch < Record
5+
belongs_to :job, foreign_key: :job_id, optional: true
6+
has_many :jobs, foreign_key: :batch_id
7+
8+
scope :incomplete, -> {
9+
where(finished_at: nil).where("changed_at IS NOT NULL OR last_changed_at < ?", 1.hour.ago)
10+
}
11+
12+
class << self
13+
def current_batch_id
14+
Thread.current[:current_batch_id]
15+
end
16+
17+
def enqueue(attributes = {})
18+
previous_batch_id = current_batch_id.presence || nil
19+
20+
job_batch = nil
21+
transaction do
22+
job_batch = create!(batch_attributes(attributes))
23+
Thread.current[:current_batch_id] = job_batch.id
24+
yield
25+
end
26+
27+
job_batch
28+
ensure
29+
Thread.current[:current_batch_id] = previous_batch_id
30+
end
31+
32+
def dispatch_finished_batches
33+
incomplete.order(:id).pluck(:id).each do |id|
34+
transaction do
35+
where(id:).non_blocking_lock.each(&:finish)
36+
end
37+
end
38+
end
39+
40+
private
41+
42+
def batch_attributes(attributes)
43+
attributes = case attributes
44+
in { on_finish: on_finish_klass }
45+
attributes.merge(
46+
job_class: on_finish_klass,
47+
completion_type: "success"
48+
)
49+
in { on_success: on_success_klass }
50+
attributes.merge(
51+
job_class: on_success_klass,
52+
completion_type: "success"
53+
)
54+
end
55+
56+
attributes.except(:on_finish, :on_success)
57+
end
58+
end
59+
60+
def finished?
61+
finished_at.present?
62+
end
63+
64+
def finish
65+
return if finished?
66+
reset_changed_at
67+
jobs.find_each do |next_job|
68+
# FIXME: If it's failed but is going to retry, how do we know?
69+
# Because we need to know if we will determine what the failed execution means
70+
# FIXME: use "success" vs "finish" vs "discard" `completion_type` to determine
71+
# how to analyze each job
72+
return unless next_job.finished?
73+
end
74+
75+
attrs = {}
76+
77+
if job_class.present?
78+
job_klass = job_class.constantize
79+
active_job = job_klass.perform_later(self)
80+
attrs[:job] = Job.find_by(active_job_id: active_job.job_id)
81+
end
82+
83+
update!({ finished_at: Time.zone.now }.merge(attrs))
84+
end
85+
86+
private
87+
88+
def reset_changed_at
89+
if changed_at.blank? && last_changed_at.present?
90+
update_columns(last_changed_at: Time.zone.now) # wait another hour before we check again
91+
else
92+
update_columns(changed_at: nil) # clear out changed_at so we ignore this until the next job finishes
93+
end
94+
end
95+
end
96+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
class CreateSolidQueueBatchTable < ActiveRecord::Migration[7.1]
2+
def change
3+
create_table :solid_queue_job_batches do |t|
4+
t.references :job, index: { unique: true }
5+
t.string :job_class
6+
t.string :completion_type
7+
t.datetime :finished_at
8+
t.datetime :changed_at
9+
t.datetime :last_changed_at
10+
t.timestamps
11+
12+
t.index [ :finished_at ]
13+
t.index [ :changed_at ]
14+
t.index [ :last_changed_at ]
15+
end
16+
17+
add_reference :solid_queue_jobs, :batch, index: true
18+
add_foreign_key :solid_queue_jobs, :solid_queue_job_batches, column: :batch_id, on_delete: :cascade
19+
add_foreign_key :solid_queue_job_batches, :solid_queue_jobs, column: :job_id
20+
end
21+
end

Diff for: lib/active_job/job_batch_id.rb

+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
3+
# Inspired by active_job/core.rb docs
4+
# https://github.com/rails/rails/blob/1c2529b9a6ba5a1eff58be0d0373d7d9d401015b/activejob/lib/active_job/core.rb#L136
5+
module ActiveJob
6+
module JobBatchId
7+
extend ActiveSupport::Concern
8+
9+
included do
10+
attr_accessor :batch_id
11+
end
12+
13+
def serialize
14+
super.merge('batch_id' => batch_id)
15+
end
16+
17+
def deserialize(job_data)
18+
super
19+
self.batch_id = job_data['batch_id']
20+
end
21+
end
22+
end

Diff for: lib/solid_queue.rb

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
require "active_job/queue_adapters/solid_queue_adapter"
77
require "active_job/concurrency_controls"
8+
require "active_job/job_batch_id"
89

910
require "solid_queue/app_executor"
1011
require "solid_queue/processes/supervised"

Diff for: lib/solid_queue/dispatcher.rb

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def run
3030
def dispatch_next_batch
3131
with_polling_volume do
3232
SolidQueue::ScheduledExecution.dispatch_next_batch(batch_size)
33+
SolidQueue::JobBatch.dispatch_finished_batches
3334
end
3435
end
3536

Diff for: lib/solid_queue/engine.rb

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class Engine < ::Rails::Engine
3333
initializer "solid_queue.active_job.extensions" do
3434
ActiveSupport.on_load :active_job do
3535
include ActiveJob::ConcurrencyControls
36+
include ActiveJob::JobBatchId
3637
end
3738
end
3839
end

Diff for: test/dummy/app/jobs/batch_completion_job.rb

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
class BatchCompletionJob < ApplicationJob
2+
queue_as :background
3+
4+
def perform(batch)
5+
Rails.logger.info "#{batch.jobs.size} jobs completed!"
6+
end
7+
end

Diff for: test/dummy/app/jobs/sleepy_job.rb

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
class SleepyJob < ApplicationJob
2+
queue_as :background
3+
4+
retry_on Exception, wait: 30.seconds, attempts: 5
5+
6+
def perform(seconds_to_sleep)
7+
Rails.logger.info "Feeling #{seconds_to_sleep} seconds sleepy..."
8+
sleep seconds_to_sleep
9+
end
10+
end

Diff for: test/dummy/db/schema.rb

+19
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,21 @@
4646
t.index ["job_id"], name: "index_solid_queue_failed_executions_on_job_id", unique: true
4747
end
4848

49+
create_table "solid_queue_job_batches", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
50+
t.bigint "job_id"
51+
t.string "job_class"
52+
t.string "completion_type"
53+
t.datetime "finished_at"
54+
t.datetime "changed_at"
55+
t.datetime "last_changed_at"
56+
t.datetime "created_at", null: false
57+
t.datetime "updated_at", null: false
58+
t.index ["changed_at"], name: "index_solid_queue_job_batches_on_changed_at"
59+
t.index ["finished_at"], name: "index_solid_queue_job_batches_on_finished_at"
60+
t.index ["job_id"], name: "index_solid_queue_job_batches_on_job_id", unique: true
61+
t.index ["last_changed_at"], name: "index_solid_queue_job_batches_on_last_changed_at"
62+
end
63+
4964
create_table "solid_queue_jobs", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
5065
t.string "queue_name", null: false
5166
t.string "class_name", null: false
@@ -57,7 +72,9 @@
5772
t.string "concurrency_key"
5873
t.datetime "created_at", null: false
5974
t.datetime "updated_at", null: false
75+
t.bigint "batch_id"
6076
t.index ["active_job_id"], name: "index_solid_queue_jobs_on_active_job_id"
77+
t.index ["batch_id"], name: "index_solid_queue_jobs_on_batch_id"
6178
t.index ["class_name"], name: "index_solid_queue_jobs_on_class_name"
6279
t.index ["finished_at"], name: "index_solid_queue_jobs_on_finished_at"
6380
t.index ["queue_name", "finished_at"], name: "index_solid_queue_jobs_for_filtering"
@@ -116,6 +133,8 @@
116133
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
117134
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
118135
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
136+
add_foreign_key "solid_queue_job_batches", "solid_queue_jobs", column: "job_id"
137+
add_foreign_key "solid_queue_jobs", "solid_queue_job_batches", column: "batch_id", on_delete: :cascade
119138
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
120139
add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
121140
end

Diff for: test/models/solid_queue/job_batch_test.rb

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
require "test_helper"
2+
3+
class SolidQueue::JobBatchTest < ActiveSupport::TestCase
4+
self.use_transactional_tests = false
5+
6+
teardown do
7+
SolidQueue::Job.destroy_all
8+
SolidQueue::JobBatch.destroy_all
9+
end
10+
11+
class NiceJob < ApplicationJob
12+
retry_on Exception, wait: 1.second
13+
14+
def perform(arg)
15+
Rails.logger.info "Hi #{arg}!"
16+
end
17+
end
18+
19+
test "batch will be completed on success" do
20+
batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) {}
21+
assert_equal "success", batch.completion_type
22+
assert_equal BatchCompletionJob.name, batch.job_class
23+
end
24+
25+
test "batch will be completed on finish" do
26+
batch = SolidQueue::JobBatch.enqueue(on_success: BatchCompletionJob) {}
27+
assert_equal "success", batch.completion_type
28+
assert_equal BatchCompletionJob.name, batch.job_class
29+
end
30+
31+
test "sets the batch_id on jobs created inside of the enqueue block" do
32+
batch = SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do
33+
NiceJob.perform_later("world")
34+
NiceJob.perform_later("people")
35+
end
36+
37+
assert_equal 2, SolidQueue::Job.count
38+
assert_equal [batch.id] * 2, SolidQueue::Job.last(2).map(&:batch_id)
39+
end
40+
41+
test "batch id is present inside the block" do
42+
assert_nil SolidQueue::JobBatch.current_batch_id
43+
SolidQueue::JobBatch.enqueue(on_finish: BatchCompletionJob) do
44+
assert_not_nil SolidQueue::JobBatch.current_batch_id
45+
end
46+
assert_nil SolidQueue::JobBatch.current_batch_id
47+
end
48+
end

0 commit comments

Comments
 (0)