Skip to content

Commit f190428

Browse files
authored
Fix: improve compatibility with MessageConsumer implementations (#51)
* avoids JMS MessageConsumer get/each extension methods
1 parent dc6e58e commit f190428

File tree

4 files changed

+31
-8
lines changed

4 files changed

+31
-8
lines changed

CHANGELOG.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1-
## unreleased
2-
- Fix: Fix test failures due to ECS compatibility default changes in `8.x` of logstash [#53](https://github.com/logstash-plugins/logstash-input-jms/pull/53)
1+
## 3.2.1
2+
- Fix: improve compatibility with MessageConsumer implementations [#51](https://github.com/logstash-plugins/logstash-input-jms/pull/51),
3+
such as IBM MQ.
4+
- Test: Fix test failures due to ECS compatibility default changes in `8.x` of logstash [#53](https://github.com/logstash-plugins/logstash-input-jms/pull/53)
35

46
## 3.2.0
57
- Feat: event_factory support + targets to aid ECS [#49](https://github.com/logstash-plugins/logstash-input-jms/pull/49)

lib/logstash/inputs/jms.rb

+26-4
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ class LogStash::Inputs::Jms < LogStash::Inputs::Threadable
8484
# If pub-sub (topic) style should be used.
8585
config :pub_sub, :validate => :boolean, :default => false
8686

87-
# Durable subscriber settings.
87+
# Durable message_consumer settings.
8888
# By default the `durable_subscriber_name` will be set to the topic, and `durable_subscriber_client_id` will be set
8989
# to 'Logstash'
9090
config :durable_subscriber, :validate => :boolean, :default => false
@@ -273,9 +273,9 @@ def run(output_queue)
273273
params = {:timeout => @timeout * 1000, :selector => @selector}
274274
subscriber = subscriber(session, params)
275275
until stop?
276-
# This will read from the queue/topic until :timeout is breached, or messages are available whichever comes
277-
# first.
278-
subscriber.each({:timeout => @interval * 1000}) do |message|
276+
# read from the queue/topic until :timeout is reached, or a message is available
277+
# (whichever comes first)
278+
do_receive_message(subscriber, timeout: @interval * 1000) do |message|
279279
queue_event(message, output_queue)
280280
break if stop?
281281
end
@@ -336,6 +336,28 @@ def queue_event(msg, output_queue)
336336
end
337337
end
338338

339+
# Loop through messages received and yield them.
340+
#
341+
# NOTE: a simplified replacement for JMS::MessageConsumer#each and #get (extensions).
342+
#
343+
# @param message_consumer [javax.jms.MessageConsumer]
344+
# @param timeout in milliseconds
345+
def do_receive_message(message_consumer, timeout: 0)
346+
# Receive messages according to timeout
347+
while true
348+
case timeout
349+
when 0 # Return immediately if no message is available
350+
message = message_consumer.receiveNoWait()
351+
when -1 # Wait forever
352+
message = message_consumer.receive()
353+
else # Wait for x milli-seconds for a message to be received from the broker
354+
message = message_consumer.receive(timeout)
355+
end
356+
break unless message
357+
yield message
358+
end
359+
end
360+
339361
def to_string_keyed_hash(hash)
340362
hash.inject({}) { |h, (key, val)| h[key.to_s] = val; h }
341363
end

logstash-input-jms.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-jms'
4-
s.version = '3.2.0'
4+
s.version = '3.2.1'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads events from a Jms Broker"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/inputs/spec_helper.rb

-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def send_message(&block)
3232
end
3333
input.run(queue)
3434

35-
destination = "#{pub_sub ? 'topic' : 'queue'}://#{queue_name}"
3635
tt.join(3)
3736
end
3837

0 commit comments

Comments
 (0)