From e0159912551889abad6f28e59de0000a25d9ad13 Mon Sep 17 00:00:00 2001 From: Dino Maric Date: Mon, 16 Sep 2024 11:44:19 +0200 Subject: [PATCH] Add `ActiveJob` adapter Implements adapter for `ActiveJob` inside the gem. This adapter is defined only if rails version is >=8, for earlier rails versions adapter is definied inside the Rails itself --- kicks.gemspec | 2 + .../queue_adapters/sneakers_adapter.rb | 39 +++++++++++ lib/sneakers.rb | 2 + spec/fixtures/test_job.rb | 13 ++++ spec/sneakers/active_job_integration_spec.rb | 66 +++++++++++++++++++ 5 files changed, 122 insertions(+) create mode 100644 lib/active_job/queue_adapters/sneakers_adapter.rb create mode 100644 spec/fixtures/test_job.rb create mode 100644 spec/sneakers/active_job_integration_spec.rb diff --git a/kicks.gemspec b/kicks.gemspec index cbd20f3..298306b 100644 --- a/kicks.gemspec +++ b/kicks.gemspec @@ -31,6 +31,8 @@ Gem::Specification.new do |gem| gem.add_dependency 'rake', '>= 12.3', '< 14.0' # for integration environment (see .travis.yml and integration_spec) + gem.add_development_dependency 'activejob', '>= 7.1' + gem.add_development_dependency 'activesupport', '>= 7.1' gem.add_development_dependency 'rabbitmq_http_api_client' gem.add_development_dependency 'redis' diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb new file mode 100644 index 0000000..671f9dc --- /dev/null +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -0,0 +1,39 @@ +module ActiveJob + module QueueAdapters + # Explicitly remove the implementation existing in older rails'. + remove_const(:SneakersAdapter) if defined?(:SneakersAdapter) + + # = Sneakers adapter for Active Job + # + # To use Sneakers set the queue_adapter config to +:sneakers+. + # + # Rails.application.config.active_job.queue_adapter = :sneakers + class SneakersAdapter < ::ActiveJob::QueueAdapters::AbstractAdapter + def initialize + @monitor = Monitor.new + end + + def enqueue(job) + @monitor.synchronize do + JobWrapper.from_queue job.queue_name + JobWrapper.enqueue ActiveSupport::JSON.encode(job.serialize) + end + end + + def enqueue_at(job, timestamp) + raise NotImplementedError, 'This queueing backend does not support scheduling jobs.' + end + + class JobWrapper + include Sneakers::Worker + from_queue 'default' + + def work(msg) + job_data = ActiveSupport::JSON.decode(msg) + Base.execute job_data + ack! + end + end + end + end +end diff --git a/lib/sneakers.rb b/lib/sneakers.rb index 0ce0dc5..8f2cd37 100644 --- a/lib/sneakers.rb +++ b/lib/sneakers.rb @@ -22,6 +22,8 @@ module Concerns require 'sneakers/middleware/config' require 'sneakers/worker' require 'sneakers/publisher' +require 'active_job/queue_adapters/sneakers_adapter' if defined?(ActiveJob) + module Sneakers extend self diff --git a/spec/fixtures/test_job.rb b/spec/fixtures/test_job.rb new file mode 100644 index 0000000..ac8056f --- /dev/null +++ b/spec/fixtures/test_job.rb @@ -0,0 +1,13 @@ +require 'sneakers' +require 'redis' + +redis_addr = compose_or_localhost('redis') +puts "REDIS is at #{redis_addr}" +$redis = Redis.new(host: redis_addr) + + +class TestJob < ActiveJob::Base + def perform(message) + $redis.incr('rails_active_job') + end +end diff --git a/spec/sneakers/active_job_integration_spec.rb b/spec/sneakers/active_job_integration_spec.rb new file mode 100644 index 0000000..c94476c --- /dev/null +++ b/spec/sneakers/active_job_integration_spec.rb @@ -0,0 +1,66 @@ +require 'spec_helper' +require 'sneakers' +require 'sneakers/runner' +require 'rabbitmq/http/client' +require 'active_job' +require 'active_job/queue_adapters/sneakers_adapter' +require 'fixtures/test_job' + +describe 'ActiveJob integration' do + before :each do + skip unless ENV['INTEGRATION'] + prepare + end + + def integration_log(msg) + puts msg if ENV['INTEGRATION_LOG'] + end + + def rmq_addr + @rmq_addr ||= compose_or_localhost('rabbitmq') + end + + def prepare + ActiveJob::Base.queue_adapter = :sneakers + + Sneakers.clear! + Sneakers.configure(amqp: "amqp://guest:guest@#{rmq_addr}:5672") + Sneakers.logger.level = Logger::ERROR + + redis_addr = compose_or_localhost('redis') + @redis = Redis.new(host: redis_addr) + @redis.del('rails_active_job') + end + + def wait_for_jobs_to_finish + sleep 5 + end + + def start_active_job_workers + integration_log 'starting ActiveJob workers.' + runner = Sneakers::Runner.new([ActiveJob::QueueAdapters::SneakersAdapter::JobWrapper], {}) + + pid = fork { runner.run } + + integration_log 'waiting for workers to stabilize (5s).' + sleep 5 + + yield if block_given? + ensure + Process.kill('TERM', pid) rescue nil + end + + it 'runs jobs enqueued on a listening queue' do + start_active_job_workers do + TestJob.perform_later('Hello Rails!') + wait_for_jobs_to_finish + assert_equal @redis.get('rails_active_job').to_i, 1 + end + end + + it 'scheduling jobs are not supported' do + assert_raises NotImplementedError, 'This queueing backend does not support scheduling jobs.' do + TestJob.set(wait: 1.second).perform_later('Say Hello to Rails later!') + end + end +end