Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Upgrade to March Hare #27

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ PATH
remote: .
specs:
autobahn (1.4.1-java)
hot_bunnies (~> 1.5)
httpclient (~> 2.2)
json
march_hare (~> 2.7)

GEM
remote: https://rubygems.org/
Expand All @@ -17,10 +17,10 @@ GEM
builder
httpclient
sinatra
hot_bunnies (1.5.0-java)
httpclient (2.2.5)
json (1.8.0-java)
json (1.8.1-java)
lz4-ruby (0.2.0-java)
march_hare (2.7.0-java)
msgpack-jruby (1.2.0-java)
ning-compress-jars (0.9.5-java)
rack (1.4.1)
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ The following example configures transport system that will transport messages J

## Requirements

Runs on JRuby 1.6.7 or newer (probably works with any 1.6.x), uses [HotBunnies](https://github.com/ruby-amqp/hot_bunnies).
* JRuby 1.7.x (might still work with 1.6.x)
* March Hare 2.7 or newer
6 changes: 3 additions & 3 deletions autobahn.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ Gem::Specification.new do |s|
s.email = ['[email protected]']
s.homepage = 'http://github.com/burtcorp/autobahn'
s.summary = %q{Get the most out of RabbitMQ with the least amount of code}
s.description = %q{Autobahn is a transport system abstraction for HotBunnies/RabbitMQ that tries to maximize message throughput rates}
s.description = %q{Autobahn is a transport system abstraction for RabbitMQ/March Hare that tries to maximize message throughput rates}

s.rubyforge_project = 'autobahn'
s.add_dependency 'hot_bunnies', '~> 1.5'

s.add_dependency 'march_hare', '~> 2.7'
s.add_dependency 'httpclient', '~> 2.2'
s.add_dependency 'json'

Expand Down
2 changes: 1 addition & 1 deletion lib/autobahn.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class NullLogger
end
end

require 'hot_bunnies'
require 'march_hare'
require 'autobahn/version'
require 'autobahn/concurrency'
require 'autobahn/cluster'
Expand Down
38 changes: 20 additions & 18 deletions lib/autobahn/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ def reject(consumer_tag, delivery_tag, options={})

def unsubscribe!(timeout=30)
@setup_lock.lock do
if @subscriptions
@subscriptions.each do |subscription|
logger.debug { "Unsubscribing from #{subscription.queue_name}" }
subscription.cancel
if @consumers
@consumers.each do |consumer|
logger.debug { "Unsubscribing from #{consumer.queue_name}" }
consumer.cancel
end
@subscriptions = nil
@consumers = nil
logger.info do
if @demultiplexer.respond_to?(:queued_message_count)
"Consumer unsubscribed, #{@demultiplexer.queued_message_count} messages buffered"
Expand Down Expand Up @@ -109,11 +109,11 @@ def setup!
@setup_lock.lock do
@logger.warn(%[No queues to subscribe to, transport system is empty]) if @routing.empty?
@queues = create_queues
@subscriptions = create_subscriptions
@subscriptions.each do |subscription|
logger.info { sprintf('Subscribing to %s with prefetch %d', subscription.queue_name, @prefetch || 0) }
queue_consumer = QueueingConsumer.new(subscription.channel, @encoder_registry, @demultiplexer, :preferred_decoder => @preferred_decoder, :fallback_decoder => @fallback_decoder)
subscription.start(queue_consumer)
@consumers = @queues.map do |queue|
logger.info { sprintf('Subscribing to %s with prefetch %d', queue.name, @prefetch || 0) }
subscription_options = {:ack => true}
queue_consumer = QueueingConsumer.new(queue.channel, queue, subscription_options, @encoder_registry, @demultiplexer, :preferred_decoder => @preferred_decoder, :fallback_decoder => @fallback_decoder)
queue.subscribe_with(queue_consumer, subscription_options)
end
@deliver.set(true)
end
Expand All @@ -128,11 +128,9 @@ def deliver(handler, timeout)
end

def channels_by_consumer_tag
@channels_by_consumer_tag ||= Hash[@subscriptions.map { |s| [s.consumer_tag, s.channel] }]
end

def create_subscriptions
@queues.map { |queue| queue.subscribe(:ack => true) }
@channels_by_consumer_tag ||= @consumers.each_with_object({}) do |consumer, acc|
acc[consumer.consumer_tag] = consumer.channel
end
end

def create_queues
Expand Down Expand Up @@ -186,15 +184,19 @@ def method_missing(name, *args, &block)
end
end

class QueueingConsumer < HotBunnies::Queue::BaseConsumer
def initialize(channel, encoder_registry, demultiplexer, options = {})
super(channel)
class QueueingConsumer < MarchHare::CallbackConsumer
def initialize(channel, queue, subscription_options, encoder_registry, demultiplexer, options = {})
super(channel, queue, subscription_options, proc {})
@encoder_registry = encoder_registry
@demultiplexer = demultiplexer
@preferred_decoder = options[:preferred_decoder]
@fallback_decoder = options[:fallback_decoder]
end

def queue_name
@queue.name
end

def deliver(*pair)
headers, encoded_message = pair
decoder = begin
Expand Down
2 changes: 1 addition & 1 deletion lib/autobahn/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def with_buffer_exclusively(rk)

def exchanges_by_routing_key
@exchanges_by_routing_key ||= begin
exchanges_by_node = Hash[nodes_by_routing_key.values.uniq.map { |node| [node, @connections[node].create_channel.exchange(@exchange_name, :passive => true)] }]
exchanges_by_node = Hash[nodes_by_routing_key.values.uniq.map { |node| [node, @connections[node].create_channel.exchange(@exchange_name, :type => :direct, :passive => true)] }]
Hash[nodes_by_routing_key.map { |rk, node| [rk, exchanges_by_node[node]] }]
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/autobahn/transport_system.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TransportSystem

def initialize(api_uri, exchange_name, options={})
@cluster = (options[:cluster_factory] || Cluster).new(api_uri, options)
@connection_factory = options[:connection_factory] || HotBunnies
@connection_factory = options[:connection_factory] || MarchHare
@exchange_name = exchange_name
@host_resolver = options[:host_resolver] || DefaultHostResolver.new
@encoder = options[:encoder] || StringEncoder.new
Expand Down
6 changes: 3 additions & 3 deletions spec/autobahn/queueing_consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

module Autobahn
describe QueueingConsumer do
doubles :channel, :encoder_registry, :encoder, :demultiplexer, :headers, :encoded_message, :decoded_message
doubles :channel, :queue, :encoder_registry, :encoder, :demultiplexer, :headers, :encoded_message, :decoded_message

let :queueing_consumer do
described_class.new(channel, encoder_registry, demultiplexer)
described_class.new(channel, queue, subscription_options = {}, encoder_registry, demultiplexer)
end

describe '#deliver' do
before do
headers.stub(:content_type).and_return('application/stuff')
Expand Down
1 change: 0 additions & 1 deletion spec/autobahn_spec.rb

This file was deleted.

19 changes: 7 additions & 12 deletions spec/integration/autobahn_intg_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create_transport_system(name, options={})
before :all do
begin
NUM_NODES.times do |i|
connection = HotBunnies.connect(:host => HOSTNAME, :port => BASE_PORT + i)
connection = MarchHare.connect(:host => HOSTNAME, :port => BASE_PORT + i)
channel = connection.create_channel
exchange = channel.exchange(EXCHANGE_NAME, :type => :direct)
NUM_QUEUES_PER_NODE.times do |j|
Expand All @@ -59,8 +59,8 @@ def create_transport_system(name, options={})
end

before :all do
@connection = HotBunnies.connect(:port => BASE_PORT)
@exchange = @connection.create_channel.exchange(EXCHANGE_NAME, :passive => true)
@connection = MarchHare.connect(:port => BASE_PORT)
@exchange = @connection.create_channel.exchange(EXCHANGE_NAME, :passive => true, :type => :direct)
@queues = NUM_QUEUES.times.map do |i|
@connection.create_channel.queue(QUEUE_NAMES[i], :passive => true)
end
Expand Down Expand Up @@ -341,15 +341,10 @@ def create_transport_system(name, options={})
@consumer.disconnect!
@consumer = @transport_system.consumer(:demultiplexer => demultiplexer)
@consumer.subscribe(:mode => :noop)
messages = []
12.times do |i|
headers, message = demultiplexer.take_next(i % 3)
if headers
messages << message
headers.ack
end
end
messages.should have(12).items
_, msg0 = demultiplexer.take_next(0)
_, msg1 = demultiplexer.take_next(1)
_, msg2 = demultiplexer.take_next(2)
[msg0, msg1, msg2].compact.should_not be_empty
end
end

Expand Down