Skip to content

Commit

Permalink
dont assume subscription names, you may have many for the same topic …
Browse files Browse the repository at this point in the history
…for fanout
  • Loading branch information
glytch committed Aug 19, 2020
1 parent 5d2231c commit bc0efbe
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 33 deletions.
11 changes: 3 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ And then execute:

### Conventions

- Subscription names follow the format `"#{topic_name}-subscription"`
- Messages are hashes
- Message hashes are encoded to JSON for transport and decoded back to hashes when reading

### Receiving Messages

```ruby
reader = Hover::PubSub::Reader.new(
project_id: ENV['GCP_PUBSUB_PROJECT_ID'],
topic_names: ['list', 'of', 'topic', 'names'],
subscription_names: ['list', 'of', 'subscription', 'names'],
ack_deadline: 30
)

Expand All @@ -38,12 +36,9 @@ end

A `Reader` instance has a `#read` instance method that takes a block. The block is responsible for processing each message. If the block returns true, processing is considered successful and the message is acknowledged and deleted. If the block returns false the message goes back to the queue for another reader to attempt processing again.

When the `#read` method is called a thread is created for each topic subscription. And all subscriptions are read from concurrently. It is safe to have more than one reader reading at the same time. With that you can scale up the number of active readers as the number of messages needing to be processed grows.

`#read` does not yield the [received message](https://googleapis.dev/ruby/google-cloud-pubsub/latest/Google/Cloud/PubSub/ReceivedMessage.html) objects to your block. It assumes your messages are JSON strings and decodes them and returns the decoded object.

`Reader` assumes your subscription names follow the pattern `"#{topic_name}-subscription"`.
When the `#read` method is called a thread is created for each subscription. And all subscriptions are read from concurrently. It is safe to have more than one reader reading at the same time. With that you can scale up the number of active readers as the number of messages needing to be processed grows.

`#read` does not yield the [received message](https://googleapis.dev/ruby/google-cloud-pubsub/latest/Google/Cloud/PubSub/ReceivedMessage.html) objects to your block. It assumes your messages are JSON strings and decodes them and returns the decoded object.

### Publishing Messages

Expand Down
32 changes: 10 additions & 22 deletions lib/hover/pub_sub/reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,21 @@
module Hover
module PubSub
class Reader
def self.subscription_name(topic_name)
"#{topic_name}-subscription"
end

def self.parse_message(message)
json = message.data
JSON.parse(json)
end

def initialize(project_id:, topic_names:, ack_deadline:)
def initialize(project_id:, subscription_names:, ack_deadline:)
@project_id = project_id
@topic_names = topic_names
@subscription_names = subscription_names
@ack_deadline = ack_deadline
end

def read
for_each_new_message do |topic_name, message|
for_each_new_message do |subscription_name, message|
parsed_message = self.class.parse_message(message)
processed_successfully = yield(topic_name, parsed_message).eql?(true)
processed_successfully = yield(subscription_name, parsed_message).eql?(true)

delete message if processed_successfully
end
Expand All @@ -33,10 +29,10 @@ def read
private

def for_each_new_message
threads = subscriptions.map do |topic_name, subscription|
threads = subscriptions.map do |subscription_name, subscription|
Thread.new do
subscription.pull(immediate: false).each do |message|
yield(topic_name, message)
yield(subscription_name, message)
end
end
end
Expand All @@ -50,18 +46,10 @@ def delete(message)
end

def subscriptions
@topic_names.map.with_object({}) do |topic_name, hash|
project.topic(topic_name, skip_lookup: true)

hash[topic_name] = subscription(topic_name)
end
end

def subscription(topic_name)
name = self.class.subscription_name(topic_name)

project.subscription(name, skip_lookup: true).tap do |subscription|
subscription.deadline = @ack_deadline
@subscription_names.map.with_object({}) do |name, hash|
hash[name] = project.subscription(name, skip_lookup: true).tap do |subscription|
subscription.deadline = @ack_deadline
end
end
end

Expand Down
6 changes: 3 additions & 3 deletions spec/hover/pub_sub/reader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
let(:instance) do
described_class.new(
project_id: @pubsub_project_id,
topic_names: [topic_name],
subscription_names: [subscription_name],
ack_deadline: 1
)
end
Expand All @@ -55,8 +55,8 @@
expect(messages.size).to eq(1)
expect(messages.first).to eq(sent_message)

instance.read do |topic_name, message|
raise "No messages expected got #{message.inspect} on topic #{topic_name}"
instance.read do |subscription_name, message|
raise "No messages expected got #{message.inspect} on subscriptions #{subscription_name}"
end
end
end
Expand Down

0 comments on commit bc0efbe

Please sign in to comment.