Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fair balance #1

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
*.swp
.bundle
.rvmrc
.idea
Gemfile.lock
bin
coverage
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ group :test do

gem "simplecov", ">= 0.20.0", require: false
gem "simplecov-lcov", ">= 0.8.0", require: false
gem 'pry'
end

group :rubocop do
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ migration to add a column to your delayed_jobs table.
rake db:migrate

That's it. Use [delayed_job as normal](http://github.com/collectiveidea/delayed_job).

## Enabling Fair Balancer

Run `rails g delayed_job:fair_id && rake db:migrate`
2 changes: 1 addition & 1 deletion delayed_job_active_record.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ Gem::Specification.new do |spec|
spec.name = "delayed_job_active_record"
spec.require_paths = ["lib"]
spec.summary = "ActiveRecord backend for DelayedJob"
spec.version = "4.1.5"
spec.version = "4.1.5p24"
end
9 changes: 7 additions & 2 deletions lib/delayed/backend/active_record.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# frozen_string_literal: true

require "active_record/version"
require_relative './active_record/fair_sql/service'

module Delayed
module Backend
module ActiveRecord
Expand All @@ -12,8 +14,9 @@ def initialize
end

def reserve_sql_strategy=(val)
if !(val == :optimized_sql || val == :default_sql)
raise ArgumentError, "allowed values are :optimized_sql or :default_sql"

unless %i(optimized_sql default_sql fair_sql).include?(val)
raise ArgumentError, "allowed values are :optimized_sql or :default_sql, :fair_sql"
end

@reserve_sql_strategy = val
Expand Down Expand Up @@ -94,6 +97,8 @@ def self.reserve_with_scope(ready_scope, worker, now)
# See https://github.com/collectiveidea/delayed_job_active_record/pull/89 for more details.
when :default_sql
reserve_with_scope_using_default_sql(ready_scope, worker, now)
when :fair_sql
Delayed::Backend::ActiveRecord::FairSql::Service.reserve(ready_scope, worker, now)
end
end

Expand Down
35 changes: 35 additions & 0 deletions lib/delayed/backend/active_record/fair_sql/rank.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
module Delayed
module Backend
module ActiveRecord
module FairSql
class Rank < ::ActiveRecord::Base
self.table_name = 'delayed_jobs_fair_ranks'

NEGATIVE_RANK = -10**6
SYSTEM_RECORD = 'DJ_SYSTEM_PRIMARY'.freeze

scope :not_system, -> { where.not(fair_id: SYSTEM_RECORD) }
scope :current_ranks, ->{ not_system.where(timestamp: current_timestamp!) }

class << self
def system_record!
where(fair_id: SYSTEM_RECORD, rank: NEGATIVE_RANK).order(timestamp: :desc).first_or_create!
end

def current_timestamp!
system_record!.timestamp
end

def update_current_timestamp!(timestamp)
system_record!.update!(timestamp: timestamp)
end

def clean_outdated!
where('timestamp < ?', current_timestamp!).delete_all
end
end
end
end
end
end
end
108 changes: 108 additions & 0 deletions lib/delayed/backend/active_record/fair_sql/service.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# frozen_string_literal: true

require_relative './rank'

module Delayed
module Backend
module ActiveRecord
module FairSql
class Service
class << self
RANK_SAVING_BATCH = 200
JOIN_LIMIT = 100
BUSY = -100

attr_accessor :queues, :max_busy

def reserve(ready_scope, worker, now)
raise ArgumentError, ":fair_sql is allowed only for MySQL" unless job_klass.connection.adapter_name.downcase.include?('mysql')
scope = apply_ranks(ready_scope)
job_klass.reserve_with_scope_using_optimized_sql(scope, worker, now)
end

def apply_ranks(scope)
if queues.nil? || (self.queues & Worker.queues).size > 0
top_ranks = "SELECT * FROM delayed_jobs_fair_ranks WHERE timestamp = #{rank_klass.current_timestamp!} ORDER BY delayed_jobs_fair_ranks.rank DESC LIMIT #{JOIN_LIMIT}"
scope = scope.joins("LEFT JOIN (#{top_ranks}) AS ranks ON ranks.fair_id = delayed_jobs.fair_id")
scope = scope.select(select_grouped).group(:fair_id).distinct
if self.max_busy
scope = scope.where('ranks.rank >= ?', self.max_busy * BUSY + BUSY)
end
scope = scope.reorder("delayed_jobs.priority ASC, ranks.rank DESC, #{rand_order}")
scope
else
scope = scope.reorder("delayed_jobs.priority, #{rand_order}")
scope
end
end

def select_grouped
columns = %w[delayed_jobs.id priority attempts handler last_error run_at locked_at failed_at locked_by queue delayed_jobs.fair_id]
columns.map { |c| "ANY_VALUE(#{c}) as #{c.split('.').last}" }.join(',')
end

def recalculate_ranks!(timestamp = newest_timestamp)
ranks = calculate_ranks(timestamp)

ranks.each_slice(RANK_SAVING_BATCH).each do |g|
rank_klass.create(g)
end

rank_klass.update_current_timestamp!(timestamp)
ranks.map { |r| r.except(:timestamp) }
end

def calculate_ranks(timestamp = newest_timestamp)
scope = job_klass
scope = scope.where(queue: self.queues) if self.queues.present? && self.queues.size > 0

stats = scope.where(last_error: nil).group(:fair_id).select(
[
'fair_id',
'sum(case when locked_at IS NOT NULL then 1 else 0 end) as b',
'sum(case when locked_at IS NOT NULL then 0 else 1 end) as w'
].join(',')
)

stats.map do |st|
{ fair_id: st.fair_id, busy: st.b, waiting: st.w, rank: calc_rank(st.b, st.w), timestamp: timestamp }
end
end

def calc_rank(busy, waiting)
rank = (busy * BUSY)
rank -= 1 if busy > 0 && waiting > 0
rank
end

def rand_order
'rand()'
end

def fetch_ranks
rank_klass.current_ranks.map do |r|
{ fair_id: r.fair_id, busy: r.busy, waiting: r.waiting, rank: r.rank }
end
end

def clean_ranks
rank_klass.clean_outdated!
end

def newest_timestamp
Time.now.utc.to_i
end

def rank_klass
Delayed::Backend::ActiveRecord::FairSql::Rank
end

def job_klass
Delayed::Backend::ActiveRecord::Job
end
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/generators/delayed_job/active_record_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class ActiveRecordGenerator < ::DelayedJobGenerator
source_paths << File.join(File.dirname(__FILE__), "templates")

def create_migration_file
migration_template "migration.rb", "db/migrate/create_delayed_jobs.rb", migration_version: migration_version
migration_template "migration.rb.tmpl", "db/migrate/create_delayed_jobs.rb", migration_version: migration_version
end

def self.next_migration_number(dirname)
Expand Down
22 changes: 22 additions & 0 deletions lib/generators/delayed_job/fair_id_generator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# frozen_string_literal: true

require "generators/delayed_job/delayed_job_generator"
require "generators/delayed_job/next_migration_version"
require "rails/generators/migration"
require "rails/generators/active_record"
require_relative './active_record_generator'

# Extend the DelayedJobGenerator so that it creates an AR migration
module DelayedJob
class FairIdGenerator < ActiveRecordGenerator
def create_migration_file
migration_template(
"add_fair_id_migration.rb.tmpl",
"db/migrate/add_fair_id_to_delayed_jobs.rb",
migration_version: migration_version
)
end

def create_executable_file; end
end
end
21 changes: 21 additions & 0 deletions lib/generators/delayed_job/templates/add_fair_id_migration.rb.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
class AddFairIdToDelayedJobs < ActiveRecord::Migration<%= migration_version %>
def self.up
add_column :delayed_jobs, :fair_id, :string
add_index :delayed_jobs, [:fair_id], name: "delayed_jobs_fair_id"

create_table :delayed_jobs_fair_ranks do |table|
table.string :fair_id
table.integer :busy, default: 0
table.integer :waiting, default: 0
table.integer :rank, default: 0
table.integer :timestamp, default: 0
end

add_index :delayed_jobs_fair_ranks, [:fair_id, :rank, :timestamp], name: "delayed_jobs_fair_ranks_index"
end

def self.down
remove_column :delayed_jobs, :fair_id
drop_table :delayed_jobs_fair_ranks
end
end
2 changes: 1 addition & 1 deletion lib/generators/delayed_job/upgrade_generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module DelayedJob
class UpgradeGenerator < ActiveRecordGenerator
def create_migration_file
migration_template(
"upgrade_migration.rb",
"upgrade_migration.rb.tmpl",
"db/migrate/add_queue_to_delayed_jobs.rb",
migration_version: migration_version
)
Expand Down
25 changes: 25 additions & 0 deletions spec/delayed/backend/active_record_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,31 @@
expect(Delayed::Backend::ActiveRecord::Job).to have_received(:reserve_with_scope_using_default_sql).once
end
end

context "with reserve_sql_strategy option set to :fair_sql" do
let(:dbms) { "MySQL" }
let(:reserve_sql_strategy) { :fair_sql }

it "uses the fair sql version" do
expect(scope).to receive(:joins) { scope }
expect(scope).to receive(:select) { scope }
expect(scope).to receive(:group) { scope }
expect(scope).to receive(:distinct) { scope }
expect(scope).to receive(:reorder) { scope }

allow(Delayed::Backend::ActiveRecord::Job).to receive(:reserve_with_scope_using_optimized_sql)
Delayed::Backend::ActiveRecord::Job.reserve_with_scope(scope, worker, Time.current)
expect(Delayed::Backend::ActiveRecord::Job).to have_received(:reserve_with_scope_using_optimized_sql).once
end

context 'when not MySQL' do
let(:dbms) { "OtherDB" }

it "raises error" do
expect { Delayed::Backend::ActiveRecord::Job.reserve_with_scope(scope, worker, Time.current) }.to raise_error(ArgumentError)
end
end
end
end

context "db_time_now" do
Expand Down
Loading