Skip to content

Commit c40c531

Browse files
committed
Fixes rails#262: Automatic worker process recycling
This PR adds two new configuration parameters: * recycle_on_oom to the Worker (via queue.yml) * calc_memory_usage as a global parameter (via application.rb, environment/*.rb, or an initializer) There are no specific unit requirements placed on either of these new parameters. What's important is: They use the same order of magnitude and they are comparable. For example, if the calc_memory_usage proc returns 300Mb as 300 (as in Megabytes) then the recycle_on_oom set on the work should be 300 too. Any worker without recycle_on_oom is not impacted in anyway. If the calc_memory_usage is nil (default), then this oom checking it off for workers under the control of this Supervisor. The check for OOM is made after the Job has run to completion and before the SolidQueue worker does any additional processing. The single biggest change to SolidQueue, that probably requires the most review is moving the job.unblock_next_blocked_job out of ClaimedExecution and up one level into Pool. The rational for this change is that the ensure block on the Job execution is not guarrenteed to run if the system / thread is forcibly shutdown while the job is inflight. However, the Thread.ensure *does* seem to get called reliably on forced shutdowns. Give my almost assuredly incomplete understanding of the concurrency implementation despite Rosa working very hard to help me to grok it, there is some risk here that this change is wrong. My logic for this change is as follows: * A job that complete successfully would have release its lock -- no change * A job that completes by way of an unhandled exception would have released its lock -- no change * A job that was killed inflight because of a worker recycle_on_oom (or an ugly restart out of the users control -- again, looking at you Heroku) needs to release its lock -- there is no guarantee that its going to be the job that starts on the worker restart. If release its lock in this use-case, then it doesn't, then that worker could find itself waiting on the dispatcher (I think) to expire Semaphores before it is able to take on new work. Small fix
1 parent 67b964d commit c40c531

File tree

10 files changed

+422
-11
lines changed

10 files changed

+422
-11
lines changed

README.md

+107-3
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ For small projects, you can run Solid Queue on the same machine as your webserve
5353

5454
**Note**: future changes to the schema will come in the form of regular migrations.
5555

56-
5756
### Single database configuration
5857

5958
Running Solid Queue in a separate database is recommended, but it's also possible to use one single database for both the app and the queue. Just follow these steps:
@@ -99,7 +98,6 @@ By default, Solid Queue will try to find your configuration under `config/queue.
9998
bin/jobs -c config/calendar.yml
10099
```
101100

102-
103101
This is what this configuration looks like:
104102

105103
```yml
@@ -236,6 +234,7 @@ There are several settings that control how Solid Queue works that you can set a
236234
- `preserve_finished_jobs`: whether to keep finished jobs in the `solid_queue_jobs` table—defaults to `true`.
237235
- `clear_finished_jobs_after`: period to keep finished jobs around, in case `preserve_finished_jobs` is true—defaults to 1 day. **Note:** Right now, there's no automatic cleanup of finished jobs. You'd need to do this by periodically invoking `SolidQueue::Job.clear_finished_in_batches`, but this will happen automatically in the near future.
238236
- `default_concurrency_control_period`: the value to be used as the default for the `duration` parameter in [concurrency controls](#concurrency-controls). It defaults to 3 minutes.
237+
- `calc_memory_usage`: a proc returns the memory consumption of the process(es) that you want to measure. It yields the Worker process PID and runs in the context of the Worker that is configured with `recycle_on_oom`. [Read more](#memory-consumption).
239238

240239
## Errors when enqueuing
241240

@@ -428,7 +427,112 @@ my_periodic_resque_job:
428427
schedule: "*/5 * * * *"
429428
```
430429

431-
and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any `solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once each time.
430+
and the job will be enqueued via `perform_later` so it'll run in Resque. However, in this case we won't track any
431+
`solid_queue_recurring_execution` record for it and there won't be any guarantees that the job is enqueued only once
432+
each time.
433+
434+
## Recycle On OOM
435+
436+
This feature recycles / restarts a worker whenever it exceeds the specified memory threshold. This is particularly
437+
useful for jobs with high memory consumption or when deploying in a memory-constrained environment.
438+
439+
If the result of the `calc_memory_usage` Proc is greater than the `recycle_on_oom` value configured on a specific
440+
worker, that worker will restart. It's important that the units returned by the `calc_memory_usage` Proc match the units
441+
of the `recycle_on_oom` value.
442+
For instance, if the `calc_memory_usage` Proc returns a value MB (i.e., 300 Vs. 300_000_000), the `recycle_on_oom` value
443+
should also be specified in MB.
444+
445+
Using the `get_process_memory` gem, and configuring it return an integer value in MB, you can configure SolidQueue as
446+
follows:
447+
448+
```ruby
449+
# application.rb, production.rb, or
450+
# initializer/solid_queue.rb file
451+
Rails.application.config.solid_queue.calc_memory_usage = ->(pid) { GetProcessMem.new(pid).mb.round(0) }
452+
```
453+
454+
Here is an example of configuring a worker to recycle when memory usage exceeds 200MB:
455+
456+
```yml
457+
worker:
458+
queues: "*"
459+
threads: 3
460+
polling_interval: 2
461+
recycle_on_oom: 200
462+
```
463+
464+
You can also use the `calc_memory_usage` Proc to compute the memory usage across multiple processes:
465+
466+
```ruby
467+
SolidQueue.configure do |config|
468+
config.calc_memory_usage = ->(_) do
469+
SolidQueue::Process.pluck(:pid).sum do |pid|
470+
GetProcessMem.new(pid).mb.round(0)
471+
rescue StandardError
472+
0 # just in case the process for the pid is no longer running
473+
end
474+
end
475+
end
476+
```
477+
478+
Then, set the worker to recycle based on the aggregate maximum memory usage of all processes:
479+
480+
```yml
481+
worker:
482+
queues: "*"
483+
threads: 3
484+
polling_interval: 2
485+
recycle_on_oom: 512
486+
```
487+
488+
Be cautious when using this feature, as it can lead to restarting the worker after each job if not properly configured.
489+
It is advisable to be especially careful using threads with workers configured with `recycle_on_oom`.
490+
For example, two queues — `slow_low_memory` and `fast_high_memory` — could easily result in the slow_low_memory jobs
491+
never completing due to the fast_high_memory jobs triggering the Worker tp recycle without allowing the slow_low_memory
492+
jobs enough time to run to completion.
493+
494+
### A Brief Digression
495+
This is a good time to mention if you choose to use `recycle_on_oom` with threads, then your jobs *really, really should*
496+
be **idempotent** -- a very fancy way of saying that a job could easily be started and stopped multiple times
497+
(see previous paragraph) so it critical than the job be designed in a way to allow for multiple runs before it completes
498+
without doing anything *"unseemly"* (such as email a customer with the same message with each restart).
499+
500+
### Finishing recycle_on_oom
501+
Anytime a Worker is recycled due to memory consumption, it will emit a standard SolidQueue log message labeled: "Worker
502+
OOM". It will report the memory usage that triggered the restart and the vital statistics of the Worker process.
503+
SolidQueue will also output it's standard messaging about the Worker starting and registering.
504+
505+
Other ideas that might help with memory constrained environments include:
506+
507+
```ruby
508+
SolidQueue.on_start do
509+
# If supported by your environment
510+
# This setting will be inherited by all processes started by this Supervisor, including recycled Workers
511+
GC.auto_compact = true
512+
513+
Process.warmup
514+
end
515+
```
516+
517+
and
518+
519+
```ruby
520+
SolidQueue.on_worker_start { Process.warmup }
521+
```
522+
523+
```yml
524+
worker:
525+
queues: "*"
526+
threads: 3
527+
polling_interval: 2
528+
recycle_on_oom: 0
529+
```
530+
531+
will effectively restart at the end of every job.
532+
533+
Finally, triggering a full GC via either after_perform, around_perform, or the end of your Job can't hurt, as it will
534+
run prior to the memory
535+
check by the Worker.
432536

433537
## Inspiration
434538

app/models/solid_queue/claimed_execution.rb

-2
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,6 @@ def perform
6565
else
6666
failed_with(result.error)
6767
end
68-
ensure
69-
job.unblock_next_blocked_job
7068
end
7169

7270
def release

lib/solid_queue.rb

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ module SolidQueue
4040
mattr_accessor :preserve_finished_jobs, default: true
4141
mattr_accessor :clear_finished_jobs_after, default: 1.day
4242
mattr_accessor :default_concurrency_control_period, default: 3.minutes
43+
mattr_accessor :calc_memory_usage, default: nil
4344

4445
delegate :on_start, :on_stop, to: Supervisor
4546

lib/solid_queue/log_subscriber.rb

+14
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,20 @@ def enqueue_recurring_task(event)
6161
end
6262
end
6363

64+
def recycle_worker(event)
65+
process = event.payload[:process]
66+
67+
attributes = {
68+
memory_used: event.payload[:memory_used],
69+
pid: process.pid,
70+
hostname: process.hostname,
71+
process_id: process.process_id,
72+
name: process.name
73+
}
74+
75+
warn formatted_event(event, action: "#{process.kind} OOM", **attributes)
76+
end
77+
6478
def start_process(event)
6579
process = event.payload[:process]
6680

lib/solid_queue/pool.rb

+12-4
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,23 @@ def initialize(size, on_idle: nil)
1515
@mutex = Mutex.new
1616
end
1717

18-
def post(execution)
18+
def post(execution, worker)
1919
available_threads.decrement
2020

21-
future = Concurrent::Future.new(args: [ execution ], executor: executor) do |thread_execution|
21+
future = Concurrent::Future.new(args: [ execution, worker ], executor: executor) do |thread_execution, worker_execution|
2222
wrap_in_app_executor do
2323
thread_execution.perform
2424
ensure
25-
available_threads.increment
26-
mutex.synchronize { on_idle.try(:call) if idle? }
25+
wrap_in_app_executor do
26+
execution.job.unblock_next_blocked_job
27+
end
28+
29+
if worker_execution.oom?
30+
worker_execution.recycle(execution)
31+
else
32+
available_threads.increment
33+
mutex.synchronize { on_idle.try(:call) if idle? }
34+
end
2735
end
2836
end
2937

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
# frozen_string_literal: true
2+
3+
require "active_support/concern"
4+
5+
module SolidQueue::Processes
6+
module Recyclable
7+
extend ActiveSupport::Concern
8+
9+
included do
10+
attr_reader :max_memory, :calc_memory_usage
11+
end
12+
13+
def recyclable_setup(**options)
14+
return unless configured?(options)
15+
16+
set_max_memory(options[:recycle_on_oom])
17+
set_calc_memory_usage if max_memory
18+
SolidQueue.logger.error { "Recycle on OOM is disabled for worker #{pid}" } unless oom_configured?
19+
end
20+
21+
def recycle(execution = nil)
22+
return false if !oom_configured? || stopped?
23+
24+
memory_used = calc_memory_usage.call(pid)
25+
return false unless memory_exceeded?(memory_used)
26+
27+
SolidQueue.instrument(:recycle_worker, process: self, memory_used: memory_used, class_name: execution&.job&.class_name) do
28+
pool.shutdown
29+
stop
30+
end
31+
32+
true
33+
end
34+
35+
def oom?
36+
oom_configured? && calc_memory_usage.call(pid) > max_memory
37+
end
38+
39+
private
40+
41+
def configured?(options)
42+
options.key?(:recycle_on_oom)
43+
end
44+
45+
def oom_configured?
46+
@oom_configured ||= max_memory.present? && calc_memory_usage.present?
47+
end
48+
49+
def memory_exceeded?(memory_used)
50+
memory_used > max_memory
51+
end
52+
53+
def set_max_memory(max_memory)
54+
if max_memory > 0
55+
@max_memory = max_memory
56+
else
57+
SolidQueue.logger.error { "Invalid value for recycle_on_oom: #{max_memory}." }
58+
end
59+
end
60+
61+
def set_calc_memory_usage
62+
if SolidQueue.calc_memory_usage.respond_to?(:call)
63+
@calc_memory_usage = SolidQueue.calc_memory_usage
64+
else
65+
SolidQueue.logger.error { "SolidQueue.calc_memory_usage provider not configured." }
66+
end
67+
end
68+
end
69+
end

lib/solid_queue/worker.rb

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
module SolidQueue
44
class Worker < Processes::Poller
55
include LifecycleHooks
6+
include Processes::Recyclable
67

78
after_boot :run_start_hooks
89
before_shutdown :run_stop_hooks
@@ -11,6 +12,7 @@ class Worker < Processes::Poller
1112

1213
def initialize(**options)
1314
options = options.dup.with_defaults(SolidQueue::Configuration::WORKER_DEFAULTS)
15+
recyclable_setup(**options)
1416

1517
@queues = Array(options[:queues])
1618
@pool = Pool.new(options[:threads], on_idle: -> { wake_up })
@@ -19,14 +21,15 @@ def initialize(**options)
1921
end
2022

2123
def metadata
22-
super.merge(queues: queues.join(","), thread_pool_size: pool.size)
24+
super.then { _1.merge(queues: queues.join(","), thread_pool_size: pool.size) }
25+
.then { oom_configured? ? _1.merge(recycle_on_oom: max_memory) : _1 }
2326
end
2427

2528
private
2629
def poll
2730
claim_executions.then do |executions|
2831
executions.each do |execution|
29-
pool.post(execution)
32+
pool.post(execution, self)
3033
end
3134

3235
executions.size

test/dummy/app/jobs/recycle_job.rb

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# frozen_string_literal: true
2+
3+
class RecycleJob < ApplicationJob
4+
def perform(nap = nil)
5+
sleep(nap) unless nap.nil?
6+
end
7+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# frozen_string_literal: true
2+
3+
class RecycleWithConcurrencyJob < ApplicationJob
4+
limits_concurrency key: ->(nap = nil) { }
5+
6+
def perform(nap = nil)
7+
sleep(nap) unless nap.nil?
8+
end
9+
end

0 commit comments

Comments
 (0)