Skip to content

Commit

Permalink
Add ActiveJob adapter
Browse files Browse the repository at this point in the history
Implements adapter for `ActiveJob` inside the gem.
This adapter is defined only if rails version is >=8,
for earlier rails versions adapter is definied inside the Rails itself
  • Loading branch information
dixpac authored and simi committed Oct 16, 2024
1 parent 40db4fb commit e015991
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 0 deletions.
2 changes: 2 additions & 0 deletions kicks.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Gem::Specification.new do |gem|
gem.add_dependency 'rake', '>= 12.3', '< 14.0'

# for integration environment (see .travis.yml and integration_spec)
gem.add_development_dependency 'activejob', '>= 7.1'
gem.add_development_dependency 'activesupport', '>= 7.1'
gem.add_development_dependency 'rabbitmq_http_api_client'
gem.add_development_dependency 'redis'

Expand Down
39 changes: 39 additions & 0 deletions lib/active_job/queue_adapters/sneakers_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module ActiveJob
module QueueAdapters
# Explicitly remove the implementation existing in older rails'.
remove_const(:SneakersAdapter) if defined?(:SneakersAdapter)

# = Sneakers adapter for Active Job
#
# To use Sneakers set the queue_adapter config to +:sneakers+.
#
# Rails.application.config.active_job.queue_adapter = :sneakers
class SneakersAdapter < ::ActiveJob::QueueAdapters::AbstractAdapter
def initialize
@monitor = Monitor.new
end

def enqueue(job)
@monitor.synchronize do
JobWrapper.from_queue job.queue_name
JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize)
end
end

def enqueue_at(job, timestamp)
raise NotImplementedError, 'This queueing backend does not support scheduling jobs.'
end

class JobWrapper
include Sneakers::Worker
from_queue 'default'

def work(msg)
job_data = ActiveSupport::JSON.decode(msg)
Base.execute job_data
ack!
end
end
end
end
end
2 changes: 2 additions & 0 deletions lib/sneakers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ module Concerns
require 'sneakers/middleware/config'
require 'sneakers/worker'
require 'sneakers/publisher'
require 'active_job/queue_adapters/sneakers_adapter' if defined?(ActiveJob)


module Sneakers
extend self
Expand Down
13 changes: 13 additions & 0 deletions spec/fixtures/test_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
require 'sneakers'
require 'redis'

redis_addr = compose_or_localhost('redis')
puts "REDIS is at #{redis_addr}"
$redis = Redis.new(host: redis_addr)


class TestJob < ActiveJob::Base
def perform(message)
$redis.incr('rails_active_job')
end
end
66 changes: 66 additions & 0 deletions spec/sneakers/active_job_integration_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
require 'spec_helper'
require 'sneakers'
require 'sneakers/runner'
require 'rabbitmq/http/client'
require 'active_job'
require 'active_job/queue_adapters/sneakers_adapter'
require 'fixtures/test_job'

describe 'ActiveJob integration' do
before :each do
skip unless ENV['INTEGRATION']
prepare
end

def integration_log(msg)
puts msg if ENV['INTEGRATION_LOG']
end

def rmq_addr
@rmq_addr ||= compose_or_localhost('rabbitmq')
end

def prepare
ActiveJob::Base.queue_adapter = :sneakers

Sneakers.clear!
Sneakers.configure(amqp: "amqp://guest:guest@#{rmq_addr}:5672")
Sneakers.logger.level = Logger::ERROR

redis_addr = compose_or_localhost('redis')
@redis = Redis.new(host: redis_addr)
@redis.del('rails_active_job')
end

def wait_for_jobs_to_finish
sleep 5
end

def start_active_job_workers
integration_log 'starting ActiveJob workers.'
runner = Sneakers::Runner.new([ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper], {})

pid = fork { runner.run }

integration_log 'waiting for workers to stabilize (5s).'
sleep 5

yield if block_given?
ensure
Process.kill('TERM', pid) rescue nil
end

it 'runs jobs enqueued on a listening queue' do
start_active_job_workers do
TestJob.perform_later('Hello Rails!')
wait_for_jobs_to_finish
assert_equal @redis.get('rails_active_job').to_i, 1
end
end

it 'scheduling jobs are not supported' do
assert_raises NotImplementedError, 'This queueing backend does not support scheduling jobs.' do
TestJob.set(wait: 1.second).perform_later('Say Hello to Rails later!')
end
end
end

0 comments on commit e015991

Please sign in to comment.