From 40870b674ec728a78b4f6bff97ce258c155725cf Mon Sep 17 00:00:00 2001 From: Theo Date: Tue, 16 Dec 2014 10:27:23 +0100 Subject: [PATCH 1/4] Fix a non-deterministic integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There was no guarantee that the messages would be evenly divided between the three buckets, but that was what the test was asserting. This version just checks that at least one bucket got at least one message – because the point of the test is just to show that custom demultiplexers can be used, not that the specific demultiplexer works. --- spec/integration/autobahn_intg_spec.rb | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/spec/integration/autobahn_intg_spec.rb b/spec/integration/autobahn_intg_spec.rb index a6c6682..92d8ca9 100644 --- a/spec/integration/autobahn_intg_spec.rb +++ b/spec/integration/autobahn_intg_spec.rb @@ -341,15 +341,10 @@ def create_transport_system(name, options={}) @consumer.disconnect! @consumer = @transport_system.consumer(:demultiplexer => demultiplexer) @consumer.subscribe(:mode => :noop) - messages = [] - 12.times do |i| - headers, message = demultiplexer.take_next(i % 3) - if headers - messages << message - headers.ack - end - end - messages.should have(12).items + _, msg0 = demultiplexer.take_next(0) + _, msg1 = demultiplexer.take_next(1) + _, msg2 = demultiplexer.take_next(2) + [msg0, msg1, msg2].compact.should_not be_empty end end From c59c35899b18bc0c9b8ad060d3f696505601c540 Mon Sep 17 00:00:00 2001 From: Theo Date: Tue, 16 Dec 2014 10:29:32 +0100 Subject: [PATCH 2/4] Remove an empty spec file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It’s been here since the first commit, empty. --- spec/autobahn_spec.rb | 1 - 1 file changed, 1 deletion(-) delete mode 100644 spec/autobahn_spec.rb diff --git a/spec/autobahn_spec.rb b/spec/autobahn_spec.rb deleted file mode 100644 index 4e768b5..0000000 --- a/spec/autobahn_spec.rb +++ /dev/null @@ -1 +0,0 @@ -# \ No newline at end of file From ba8459230bc1dd723a659f4ce7bdeff8be2156f4 Mon Sep 17 00:00:00 2001 From: Theo Date: Tue, 16 Dec 2014 11:10:31 +0100 Subject: [PATCH 3/4] Upgrade to MarchHare 2.7 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It’s been due for a while. --- Gemfile.lock | 6 ++-- autobahn.gemspec | 4 +-- lib/autobahn.rb | 2 +- lib/autobahn/consumer.rb | 38 +++++++++++++------------ lib/autobahn/publisher.rb | 2 +- lib/autobahn/transport_system.rb | 2 +- spec/autobahn/queueing_consumer_spec.rb | 6 ++-- spec/integration/autobahn_intg_spec.rb | 6 ++-- 8 files changed, 34 insertions(+), 32 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index a437893..a83d951 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,9 +2,9 @@ PATH remote: . specs: autobahn (1.4.1-java) - hot_bunnies (~> 1.5) httpclient (~> 2.2) json + march_hare (~> 2.7) GEM remote: https://rubygems.org/ @@ -17,10 +17,10 @@ GEM builder httpclient sinatra - hot_bunnies (1.5.0-java) httpclient (2.2.5) - json (1.8.0-java) + json (1.8.1-java) lz4-ruby (0.2.0-java) + march_hare (2.7.0-java) msgpack-jruby (1.2.0-java) ning-compress-jars (0.9.5-java) rack (1.4.1) diff --git a/autobahn.gemspec b/autobahn.gemspec index b450d10..7f121a0 100644 --- a/autobahn.gemspec +++ b/autobahn.gemspec @@ -16,8 +16,8 @@ Gem::Specification.new do |s| s.description = %q{Autobahn is a transport system abstraction for HotBunnies/RabbitMQ that tries to maximize message throughput rates} s.rubyforge_project = 'autobahn' - - s.add_dependency 'hot_bunnies', '~> 1.5' + + s.add_dependency 'march_hare', '~> 2.7' s.add_dependency 'httpclient', '~> 2.2' s.add_dependency 'json' diff --git a/lib/autobahn.rb b/lib/autobahn.rb index 799569b..42fc849 100644 --- a/lib/autobahn.rb +++ b/lib/autobahn.rb @@ -15,7 +15,7 @@ class NullLogger end end -require 'hot_bunnies' +require 'march_hare' require 'autobahn/version' require 'autobahn/concurrency' require 'autobahn/cluster' diff --git a/lib/autobahn/consumer.rb b/lib/autobahn/consumer.rb index a4eb394..80d275c 100644 --- a/lib/autobahn/consumer.rb +++ b/lib/autobahn/consumer.rb @@ -53,12 +53,12 @@ def reject(consumer_tag, delivery_tag, options={}) def unsubscribe!(timeout=30) @setup_lock.lock do - if @subscriptions - @subscriptions.each do |subscription| - logger.debug { "Unsubscribing from #{subscription.queue_name}" } - subscription.cancel + if @consumers + @consumers.each do |consumer| + logger.debug { "Unsubscribing from #{consumer.queue_name}" } + consumer.cancel end - @subscriptions = nil + @consumers = nil logger.info do if @demultiplexer.respond_to?(:queued_message_count) "Consumer unsubscribed, #{@demultiplexer.queued_message_count} messages buffered" @@ -109,11 +109,11 @@ def setup! @setup_lock.lock do @logger.warn(%[No queues to subscribe to, transport system is empty]) if @routing.empty? @queues = create_queues - @subscriptions = create_subscriptions - @subscriptions.each do |subscription| - logger.info { sprintf('Subscribing to %s with prefetch %d', subscription.queue_name, @prefetch || 0) } - queue_consumer = QueueingConsumer.new(subscription.channel, @encoder_registry, @demultiplexer, :preferred_decoder => @preferred_decoder, :fallback_decoder => @fallback_decoder) - subscription.start(queue_consumer) + @consumers = @queues.map do |queue| + logger.info { sprintf('Subscribing to %s with prefetch %d', queue.name, @prefetch || 0) } + subscription_options = {:ack => true} + queue_consumer = QueueingConsumer.new(queue.channel, queue, subscription_options, @encoder_registry, @demultiplexer, :preferred_decoder => @preferred_decoder, :fallback_decoder => @fallback_decoder) + queue.subscribe_with(queue_consumer, subscription_options) end @deliver.set(true) end @@ -128,11 +128,9 @@ def deliver(handler, timeout) end def channels_by_consumer_tag - @channels_by_consumer_tag ||= Hash[@subscriptions.map { |s| [s.consumer_tag, s.channel] }] - end - - def create_subscriptions - @queues.map { |queue| queue.subscribe(:ack => true) } + @channels_by_consumer_tag ||= @consumers.each_with_object({}) do |consumer, acc| + acc[consumer.consumer_tag] = consumer.channel + end end def create_queues @@ -186,15 +184,19 @@ def method_missing(name, *args, &block) end end - class QueueingConsumer < HotBunnies::Queue::BaseConsumer - def initialize(channel, encoder_registry, demultiplexer, options = {}) - super(channel) + class QueueingConsumer < MarchHare::CallbackConsumer + def initialize(channel, queue, subscription_options, encoder_registry, demultiplexer, options = {}) + super(channel, queue, subscription_options, proc {}) @encoder_registry = encoder_registry @demultiplexer = demultiplexer @preferred_decoder = options[:preferred_decoder] @fallback_decoder = options[:fallback_decoder] end + def queue_name + @queue.name + end + def deliver(*pair) headers, encoded_message = pair decoder = begin diff --git a/lib/autobahn/publisher.rb b/lib/autobahn/publisher.rb index 5657e1b..13c2fa7 100644 --- a/lib/autobahn/publisher.rb +++ b/lib/autobahn/publisher.rb @@ -89,7 +89,7 @@ def with_buffer_exclusively(rk) def exchanges_by_routing_key @exchanges_by_routing_key ||= begin - exchanges_by_node = Hash[nodes_by_routing_key.values.uniq.map { |node| [node, @connections[node].create_channel.exchange(@exchange_name, :passive => true)] }] + exchanges_by_node = Hash[nodes_by_routing_key.values.uniq.map { |node| [node, @connections[node].create_channel.exchange(@exchange_name, :type => :direct, :passive => true)] }] Hash[nodes_by_routing_key.map { |rk, node| [rk, exchanges_by_node[node]] }] end end diff --git a/lib/autobahn/transport_system.rb b/lib/autobahn/transport_system.rb index 46181ac..3ee4d13 100644 --- a/lib/autobahn/transport_system.rb +++ b/lib/autobahn/transport_system.rb @@ -7,7 +7,7 @@ class TransportSystem def initialize(api_uri, exchange_name, options={}) @cluster = (options[:cluster_factory] || Cluster).new(api_uri, options) - @connection_factory = options[:connection_factory] || HotBunnies + @connection_factory = options[:connection_factory] || MarchHare @exchange_name = exchange_name @host_resolver = options[:host_resolver] || DefaultHostResolver.new @encoder = options[:encoder] || StringEncoder.new diff --git a/spec/autobahn/queueing_consumer_spec.rb b/spec/autobahn/queueing_consumer_spec.rb index b2c4b21..7676ea3 100644 --- a/spec/autobahn/queueing_consumer_spec.rb +++ b/spec/autobahn/queueing_consumer_spec.rb @@ -3,12 +3,12 @@ module Autobahn describe QueueingConsumer do - doubles :channel, :encoder_registry, :encoder, :demultiplexer, :headers, :encoded_message, :decoded_message + doubles :channel, :queue, :encoder_registry, :encoder, :demultiplexer, :headers, :encoded_message, :decoded_message let :queueing_consumer do - described_class.new(channel, encoder_registry, demultiplexer) + described_class.new(channel, queue, subscription_options = {}, encoder_registry, demultiplexer) end - + describe '#deliver' do before do headers.stub(:content_type).and_return('application/stuff') diff --git a/spec/integration/autobahn_intg_spec.rb b/spec/integration/autobahn_intg_spec.rb index 92d8ca9..fa52072 100644 --- a/spec/integration/autobahn_intg_spec.rb +++ b/spec/integration/autobahn_intg_spec.rb @@ -34,7 +34,7 @@ def create_transport_system(name, options={}) before :all do begin NUM_NODES.times do |i| - connection = HotBunnies.connect(:host => HOSTNAME, :port => BASE_PORT + i) + connection = MarchHare.connect(:host => HOSTNAME, :port => BASE_PORT + i) channel = connection.create_channel exchange = channel.exchange(EXCHANGE_NAME, :type => :direct) NUM_QUEUES_PER_NODE.times do |j| @@ -59,8 +59,8 @@ def create_transport_system(name, options={}) end before :all do - @connection = HotBunnies.connect(:port => BASE_PORT) - @exchange = @connection.create_channel.exchange(EXCHANGE_NAME, :passive => true) + @connection = MarchHare.connect(:port => BASE_PORT) + @exchange = @connection.create_channel.exchange(EXCHANGE_NAME, :passive => true, :type => :direct) @queues = NUM_QUEUES.times.map do |i| @connection.create_channel.queue(QUEUE_NAMES[i], :passive => true) end From 9575da55a6760dd9b23acd59e4500d4abb842776 Mon Sep 17 00:00:00 2001 From: Theo Date: Tue, 16 Dec 2014 11:19:15 +0100 Subject: [PATCH 4/4] Update the readme and gem descriptions to remove all mention of HotBunnies --- README.md | 3 ++- autobahn.gemspec | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 00b74bb..7a20947 100644 --- a/README.md +++ b/README.md @@ -30,4 +30,5 @@ The following example configures transport system that will transport messages J ## Requirements -Runs on JRuby 1.6.7 or newer (probably works with any 1.6.x), uses [HotBunnies](https://github.com/ruby-amqp/hot_bunnies). +* JRuby 1.7.x (might still work with 1.6.x) +* March Hare 2.7 or newer diff --git a/autobahn.gemspec b/autobahn.gemspec index 7f121a0..eeb85cc 100644 --- a/autobahn.gemspec +++ b/autobahn.gemspec @@ -13,7 +13,7 @@ Gem::Specification.new do |s| s.email = ['platform@burtcorp.com'] s.homepage = 'http://github.com/burtcorp/autobahn' s.summary = %q{Get the most out of RabbitMQ with the least amount of code} - s.description = %q{Autobahn is a transport system abstraction for HotBunnies/RabbitMQ that tries to maximize message throughput rates} + s.description = %q{Autobahn is a transport system abstraction for RabbitMQ/March Hare that tries to maximize message throughput rates} s.rubyforge_project = 'autobahn'