diff --git a/README.md b/README.md index aa376bd..ae10bad 100644 --- a/README.md +++ b/README.md @@ -18,16 +18,14 @@ And then execute: ### Conventions -- Subscription names follow the format `"#{topic_name}-subscription"` - Messages are hashes -- Message hashes are encoded to JSON for transport and decoded back to hashes when reading ### Receiving Messages ```ruby reader = Hover::PubSub::Reader.new( project_id: ENV['GCP_PUBSUB_PROJECT_ID'], - topic_names: ['list', 'of', 'topic', 'names'], + subscription_names: ['list', 'of', 'subscription', 'names'], ack_deadline: 30 ) @@ -38,12 +36,9 @@ end A `Reader` instance has a `#read` instance method that takes a block. The block is responsible for processing each message. If the block returns true, processing is considered successful and the message is acknowledged and deleted. If the block returns false the message goes back to the queue for another reader to attempt processing again. -When the `#read` method is called a thread is created for each topic subscription. And all subscriptions are read from concurrently. It is safe to have more than one reader reading at the same time. With that you can scale up the number of active readers as the number of messages needing to be processed grows. - -`#read` does not yield the [received message](https://googleapis.dev/ruby/google-cloud-pubsub/latest/Google/Cloud/PubSub/ReceivedMessage.html) objects to your block. It assumes your messages are JSON strings and decodes them and returns the decoded object. - -`Reader` assumes your subscription names follow the pattern `"#{topic_name}-subscription"`. +When the `#read` method is called a thread is created for each subscription. And all subscriptions are read from concurrently. It is safe to have more than one reader reading at the same time. With that you can scale up the number of active readers as the number of messages needing to be processed grows. +`#read` does not yield the [received message](https://googleapis.dev/ruby/google-cloud-pubsub/latest/Google/Cloud/PubSub/ReceivedMessage.html) objects to your block. It assumes your messages are JSON strings and decodes them and returns the decoded object. ### Publishing Messages diff --git a/lib/hover/pub_sub/reader.rb b/lib/hover/pub_sub/reader.rb index a848dc6..518bd15 100644 --- a/lib/hover/pub_sub/reader.rb +++ b/lib/hover/pub_sub/reader.rb @@ -6,25 +6,21 @@ module Hover module PubSub class Reader - def self.subscription_name(topic_name) - "#{topic_name}-subscription" - end - def self.parse_message(message) json = message.data JSON.parse(json) end - def initialize(project_id:, topic_names:, ack_deadline:) + def initialize(project_id:, subscription_names:, ack_deadline:) @project_id = project_id - @topic_names = topic_names + @subscription_names = subscription_names @ack_deadline = ack_deadline end def read - for_each_new_message do |topic_name, message| + for_each_new_message do |subscription_name, message| parsed_message = self.class.parse_message(message) - processed_successfully = yield(topic_name, parsed_message).eql?(true) + processed_successfully = yield(subscription_name, parsed_message).eql?(true) delete message if processed_successfully end @@ -33,10 +29,10 @@ def read private def for_each_new_message - threads = subscriptions.map do |topic_name, subscription| + threads = subscriptions.map do |subscription_name, subscription| Thread.new do subscription.pull(immediate: false).each do |message| - yield(topic_name, message) + yield(subscription_name, message) end end end @@ -50,18 +46,10 @@ def delete(message) end def subscriptions - @topic_names.map.with_object({}) do |topic_name, hash| - project.topic(topic_name, skip_lookup: true) - - hash[topic_name] = subscription(topic_name) - end - end - - def subscription(topic_name) - name = self.class.subscription_name(topic_name) - - project.subscription(name, skip_lookup: true).tap do |subscription| - subscription.deadline = @ack_deadline + @subscription_names.map.with_object({}) do |name, hash| + hash[name] = project.subscription(name, skip_lookup: true).tap do |subscription| + subscription.deadline = @ack_deadline + end end end diff --git a/spec/hover/pub_sub/reader_spec.rb b/spec/hover/pub_sub/reader_spec.rb index e696c5c..b702d0d 100644 --- a/spec/hover/pub_sub/reader_spec.rb +++ b/spec/hover/pub_sub/reader_spec.rb @@ -31,7 +31,7 @@ let(:instance) do described_class.new( project_id: @pubsub_project_id, - topic_names: [topic_name], + subscription_names: [subscription_name], ack_deadline: 1 ) end @@ -55,8 +55,8 @@ expect(messages.size).to eq(1) expect(messages.first).to eq(sent_message) - instance.read do |topic_name, message| - raise "No messages expected got #{message.inspect} on topic #{topic_name}" + instance.read do |subscription_name, message| + raise "No messages expected got #{message.inspect} on subscriptions #{subscription_name}" end end end