diff --git a/Gemfile.lock b/Gemfile.lock index a437893..a83d951 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,9 +2,9 @@ PATH remote: . specs: autobahn (1.4.1-java) - hot_bunnies (~> 1.5) httpclient (~> 2.2) json + march_hare (~> 2.7) GEM remote: https://rubygems.org/ @@ -17,10 +17,10 @@ GEM builder httpclient sinatra - hot_bunnies (1.5.0-java) httpclient (2.2.5) - json (1.8.0-java) + json (1.8.1-java) lz4-ruby (0.2.0-java) + march_hare (2.7.0-java) msgpack-jruby (1.2.0-java) ning-compress-jars (0.9.5-java) rack (1.4.1) diff --git a/README.md b/README.md index 00b74bb..7a20947 100644 --- a/README.md +++ b/README.md @@ -30,4 +30,5 @@ The following example configures transport system that will transport messages J ## Requirements -Runs on JRuby 1.6.7 or newer (probably works with any 1.6.x), uses [HotBunnies](https://github.com/ruby-amqp/hot_bunnies). +* JRuby 1.7.x (might still work with 1.6.x) +* March Hare 2.7 or newer diff --git a/autobahn.gemspec b/autobahn.gemspec index b450d10..eeb85cc 100644 --- a/autobahn.gemspec +++ b/autobahn.gemspec @@ -13,11 +13,11 @@ Gem::Specification.new do |s| s.email = ['platform@burtcorp.com'] s.homepage = 'http://github.com/burtcorp/autobahn' s.summary = %q{Get the most out of RabbitMQ with the least amount of code} - s.description = %q{Autobahn is a transport system abstraction for HotBunnies/RabbitMQ that tries to maximize message throughput rates} + s.description = %q{Autobahn is a transport system abstraction for RabbitMQ/March Hare that tries to maximize message throughput rates} s.rubyforge_project = 'autobahn' - - s.add_dependency 'hot_bunnies', '~> 1.5' + + s.add_dependency 'march_hare', '~> 2.7' s.add_dependency 'httpclient', '~> 2.2' s.add_dependency 'json' diff --git a/lib/autobahn.rb b/lib/autobahn.rb index 799569b..42fc849 100644 --- a/lib/autobahn.rb +++ b/lib/autobahn.rb @@ -15,7 +15,7 @@ class NullLogger end end -require 'hot_bunnies' +require 'march_hare' require 'autobahn/version' require 'autobahn/concurrency' require 'autobahn/cluster' diff --git a/lib/autobahn/consumer.rb b/lib/autobahn/consumer.rb index a4eb394..80d275c 100644 --- a/lib/autobahn/consumer.rb +++ b/lib/autobahn/consumer.rb @@ -53,12 +53,12 @@ def reject(consumer_tag, delivery_tag, options={}) def unsubscribe!(timeout=30) @setup_lock.lock do - if @subscriptions - @subscriptions.each do |subscription| - logger.debug { "Unsubscribing from #{subscription.queue_name}" } - subscription.cancel + if @consumers + @consumers.each do |consumer| + logger.debug { "Unsubscribing from #{consumer.queue_name}" } + consumer.cancel end - @subscriptions = nil + @consumers = nil logger.info do if @demultiplexer.respond_to?(:queued_message_count) "Consumer unsubscribed, #{@demultiplexer.queued_message_count} messages buffered" @@ -109,11 +109,11 @@ def setup! @setup_lock.lock do @logger.warn(%[No queues to subscribe to, transport system is empty]) if @routing.empty? @queues = create_queues - @subscriptions = create_subscriptions - @subscriptions.each do |subscription| - logger.info { sprintf('Subscribing to %s with prefetch %d', subscription.queue_name, @prefetch || 0) } - queue_consumer = QueueingConsumer.new(subscription.channel, @encoder_registry, @demultiplexer, :preferred_decoder => @preferred_decoder, :fallback_decoder => @fallback_decoder) - subscription.start(queue_consumer) + @consumers = @queues.map do |queue| + logger.info { sprintf('Subscribing to %s with prefetch %d', queue.name, @prefetch || 0) } + subscription_options = {:ack => true} + queue_consumer = QueueingConsumer.new(queue.channel, queue, subscription_options, @encoder_registry, @demultiplexer, :preferred_decoder => @preferred_decoder, :fallback_decoder => @fallback_decoder) + queue.subscribe_with(queue_consumer, subscription_options) end @deliver.set(true) end @@ -128,11 +128,9 @@ def deliver(handler, timeout) end def channels_by_consumer_tag - @channels_by_consumer_tag ||= Hash[@subscriptions.map { |s| [s.consumer_tag, s.channel] }] - end - - def create_subscriptions - @queues.map { |queue| queue.subscribe(:ack => true) } + @channels_by_consumer_tag ||= @consumers.each_with_object({}) do |consumer, acc| + acc[consumer.consumer_tag] = consumer.channel + end end def create_queues @@ -186,15 +184,19 @@ def method_missing(name, *args, &block) end end - class QueueingConsumer < HotBunnies::Queue::BaseConsumer - def initialize(channel, encoder_registry, demultiplexer, options = {}) - super(channel) + class QueueingConsumer < MarchHare::CallbackConsumer + def initialize(channel, queue, subscription_options, encoder_registry, demultiplexer, options = {}) + super(channel, queue, subscription_options, proc {}) @encoder_registry = encoder_registry @demultiplexer = demultiplexer @preferred_decoder = options[:preferred_decoder] @fallback_decoder = options[:fallback_decoder] end + def queue_name + @queue.name + end + def deliver(*pair) headers, encoded_message = pair decoder = begin diff --git a/lib/autobahn/publisher.rb b/lib/autobahn/publisher.rb index 5657e1b..13c2fa7 100644 --- a/lib/autobahn/publisher.rb +++ b/lib/autobahn/publisher.rb @@ -89,7 +89,7 @@ def with_buffer_exclusively(rk) def exchanges_by_routing_key @exchanges_by_routing_key ||= begin - exchanges_by_node = Hash[nodes_by_routing_key.values.uniq.map { |node| [node, @connections[node].create_channel.exchange(@exchange_name, :passive => true)] }] + exchanges_by_node = Hash[nodes_by_routing_key.values.uniq.map { |node| [node, @connections[node].create_channel.exchange(@exchange_name, :type => :direct, :passive => true)] }] Hash[nodes_by_routing_key.map { |rk, node| [rk, exchanges_by_node[node]] }] end end diff --git a/lib/autobahn/transport_system.rb b/lib/autobahn/transport_system.rb index 46181ac..3ee4d13 100644 --- a/lib/autobahn/transport_system.rb +++ b/lib/autobahn/transport_system.rb @@ -7,7 +7,7 @@ class TransportSystem def initialize(api_uri, exchange_name, options={}) @cluster = (options[:cluster_factory] || Cluster).new(api_uri, options) - @connection_factory = options[:connection_factory] || HotBunnies + @connection_factory = options[:connection_factory] || MarchHare @exchange_name = exchange_name @host_resolver = options[:host_resolver] || DefaultHostResolver.new @encoder = options[:encoder] || StringEncoder.new diff --git a/spec/autobahn/queueing_consumer_spec.rb b/spec/autobahn/queueing_consumer_spec.rb index b2c4b21..7676ea3 100644 --- a/spec/autobahn/queueing_consumer_spec.rb +++ b/spec/autobahn/queueing_consumer_spec.rb @@ -3,12 +3,12 @@ module Autobahn describe QueueingConsumer do - doubles :channel, :encoder_registry, :encoder, :demultiplexer, :headers, :encoded_message, :decoded_message + doubles :channel, :queue, :encoder_registry, :encoder, :demultiplexer, :headers, :encoded_message, :decoded_message let :queueing_consumer do - described_class.new(channel, encoder_registry, demultiplexer) + described_class.new(channel, queue, subscription_options = {}, encoder_registry, demultiplexer) end - + describe '#deliver' do before do headers.stub(:content_type).and_return('application/stuff') diff --git a/spec/autobahn_spec.rb b/spec/autobahn_spec.rb deleted file mode 100644 index 4e768b5..0000000 --- a/spec/autobahn_spec.rb +++ /dev/null @@ -1 +0,0 @@ -# \ No newline at end of file diff --git a/spec/integration/autobahn_intg_spec.rb b/spec/integration/autobahn_intg_spec.rb index a6c6682..fa52072 100644 --- a/spec/integration/autobahn_intg_spec.rb +++ b/spec/integration/autobahn_intg_spec.rb @@ -34,7 +34,7 @@ def create_transport_system(name, options={}) before :all do begin NUM_NODES.times do |i| - connection = HotBunnies.connect(:host => HOSTNAME, :port => BASE_PORT + i) + connection = MarchHare.connect(:host => HOSTNAME, :port => BASE_PORT + i) channel = connection.create_channel exchange = channel.exchange(EXCHANGE_NAME, :type => :direct) NUM_QUEUES_PER_NODE.times do |j| @@ -59,8 +59,8 @@ def create_transport_system(name, options={}) end before :all do - @connection = HotBunnies.connect(:port => BASE_PORT) - @exchange = @connection.create_channel.exchange(EXCHANGE_NAME, :passive => true) + @connection = MarchHare.connect(:port => BASE_PORT) + @exchange = @connection.create_channel.exchange(EXCHANGE_NAME, :passive => true, :type => :direct) @queues = NUM_QUEUES.times.map do |i| @connection.create_channel.queue(QUEUE_NAMES[i], :passive => true) end @@ -341,15 +341,10 @@ def create_transport_system(name, options={}) @consumer.disconnect! @consumer = @transport_system.consumer(:demultiplexer => demultiplexer) @consumer.subscribe(:mode => :noop) - messages = [] - 12.times do |i| - headers, message = demultiplexer.take_next(i % 3) - if headers - messages << message - headers.ack - end - end - messages.should have(12).items + _, msg0 = demultiplexer.take_next(0) + _, msg1 = demultiplexer.take_next(1) + _, msg2 = demultiplexer.take_next(2) + [msg0, msg1, msg2].compact.should_not be_empty end end