Skip to content

Commit 4a1530f

Browse files
authored
Support codecs generating multiple events (#56)
* tests: fix build - use https for fixture resource retrieval - use "latest" 5.15.x activemq - declare module encapsulation exports * feature: support codec emitting multiple events Resolves: #22
1 parent ab75930 commit 4a1530f

File tree

7 files changed

+61
-24
lines changed

7 files changed

+61
-24
lines changed

.ci/run.sh

+9-1
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,23 @@ env
55

66
set -ex
77

8-
export ACTIVEMQ_VERSION=5.15.15
8+
export ACTIVEMQ_VERSION=5.15.16
99
./setup_broker.sh
1010
bundle install
11+
12+
java_major_version="$(JRUBY_OPTS="" jruby -e 'puts ENV_JAVA["java.version"]&.slice(/(?!1[.])[0-9]+/)')"
13+
if (( "${java_major_version}" >= "9" )); then
14+
export JRUBY_OPTS="-J--add-exports=java.base/sun.security.ssl=ALL-UNNAMED${JRUBY_OPTS:+ }${JRUBY_OPTS}"
15+
fi
16+
1117
bundle exec rspec
18+
1219
./start_ssl_broker.sh
1320
bundle exec rspec -fd --tag integration --tag tls --tag ~plaintext
1421
./stop_broker.sh
1522

1623
./start_broker.sh
1724
bundle exec rspec -fd --tag integration --tag plaintext --tag ~tls
1825
./stop_broker.sh
26+
1927
./teardown_broker.sh

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.3.0
2+
- Added support for decoding multiple events from text or binary messages when using a codec that produces multiple events
3+
14
## 3.2.2
25
- Fix: Remove usage of `java_kind_of?` to allow this plugin to be supported for versions of Logstash using jruby-9.3.x
36
[#54](https://github.com/logstash-plugins/logstash-input-jms/pull/54)

lib/logstash/inputs/jms.rb

+32-20
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ def run(output_queue)
276276
# read from the queue/topic until :timeout is reached, or a message is available
277277
# (whichever comes first)
278278
do_receive_message(subscriber, timeout: @interval * 1000) do |message|
279-
queue_event(message, output_queue)
279+
queue_events(message, output_queue)
280280
break if stop?
281281
end
282282
end
@@ -296,20 +296,27 @@ def run(output_queue)
296296
end
297297
end # def run_consumer
298298

299-
def queue_event(msg, output_queue)
300-
begin
301-
if @include_body
302-
if msg.kind_of?(JMS::MapMessage)
303-
event = process_map_message(msg)
304-
elsif msg.kind_of?(JMS::TextMessage) || msg.kind_of?(JMS::BytesMessage)
305-
event = decode_message(msg)
306-
else
307-
@logger.error( "Unsupported message type #{msg.data.class.to_s}" )
308-
end
299+
def events(msg, &event_handler)
300+
return enum_for(:events, msg).to_a unless block_given?
301+
302+
if @include_body
303+
if msg.kind_of?(JMS::MapMessage)
304+
return process_map_message(msg, &event_handler)
305+
elsif msg.kind_of?(JMS::TextMessage) || msg.kind_of?(JMS::BytesMessage)
306+
return process_string_message(msg, &event_handler)
307+
else
308+
@logger.error( "Unsupported message type #{msg.data.class.to_s}" )
309309
end
310+
end
310311

311-
event ||= event_factory.new_event
312+
yield event_factory.new_event
313+
rescue => e
314+
@logger.error("Failed to create event", :message => msg, :exception => e,
315+
:backtrace => e.backtrace)
316+
end
312317

318+
def queue_events(msg, output_queue)
319+
events(msg).map do |event|
313320
# Here, we can use the JMS Enqueue timestamp as the @timestamp
314321
if @use_jms_timestamp && msg.jms_timestamp
315322
event.set("@timestamp", LogStash::Timestamp.at(msg.jms_timestamp / 1000, (msg.jms_timestamp % 1000) * 1000))
@@ -327,12 +334,9 @@ def queue_event(msg, output_queue)
327334
@properties_setter.call(event, properties)
328335
end
329336

330-
decorate(event)
331-
output_queue << event
332-
333-
rescue => e # parse or event creation error
334-
@logger.error("Failed to create event", :message => msg, :exception => e,
335-
:backtrace => e.backtrace)
337+
event
338+
end.each do |ready_event|
339+
output_queue << ready_event
336340
end
337341
end
338342

@@ -365,9 +369,17 @@ def to_string_keyed_hash(hash)
365369

366370
# @param msg [JMS::MapMessage]
367371
# @return [LogStash::Event]
368-
def process_map_message(msg)
372+
def process_map_message(msg, &event_handler)
369373
data = to_string_keyed_hash(msg.data)
370-
do_target_check_once_and_get_event_factory.new_event(data)
374+
yield do_target_check_once_and_get_event_factory.new_event(data)
375+
end
376+
377+
def process_string_message(msg, &event_handler)
378+
text = msg.to_s # javax.jms.TextMessage#getText (e.g. JSON payload)
379+
return event_factory.new_event if text.nil?
380+
381+
@codec.decode(text, &event_handler)
382+
@codec.flush(&event_handler)
371383
end
372384

373385
def do_target_check_once_and_get_event_factory

logstash-input-jms.gemspec

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-jms'
4-
s.version = '3.2.2'
4+
s.version = '3.3.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads events from a Jms Broker"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -32,5 +32,6 @@ Gem::Specification.new do |s|
3232
s.add_runtime_dependency "jruby-jms", ">= 1.2.0" #(Apache 2.0 license)
3333
s.add_runtime_dependency 'semantic_logger', '< 4.0.0'
3434

35+
s.add_development_dependency 'logstash-codec-line', '~> 3.0'
3536
s.add_development_dependency 'logstash-devutils'
3637
end

setup_broker.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,6 @@ fi
99

1010
curl -s -o activemq-all.jar https://repo1.maven.org/maven2/org/apache/activemq/activemq-all/$ACTIVEMQ_VERSION/activemq-all-$ACTIVEMQ_VERSION.jar
1111
mv activemq-all.jar ./spec/inputs/fixtures/
12-
curl -s -o activemq-bin.tar.gz http://archive.apache.org/dist/activemq/$ACTIVEMQ_VERSION/apache-activemq-$ACTIVEMQ_VERSION-bin.tar.gz
12+
curl -s -o activemq-bin.tar.gz https://archive.apache.org/dist/activemq/$ACTIVEMQ_VERSION/apache-activemq-$ACTIVEMQ_VERSION-bin.tar.gz
1313
tar xvf activemq-bin.tar.gz
1414
mv apache-activemq-$ACTIVEMQ_VERSION activemq

spec/inputs/integration/jms_spec.rb

+13
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,19 @@
241241

242242
end
243243
end
244+
245+
context 'when using a multi-event codec' do
246+
let(:config) { super().merge('codec' => 'line') }
247+
let(:message) { 'one' + "\n" + 'two' + "\n" + 'three' }
248+
it 'emits multiple events' do
249+
send_message do |session|
250+
session.message(message)
251+
end
252+
expect(queue.size).to eql 3
253+
expect(queue.map { |e| e.get('message') }).to contain_exactly("one", "two", "three")
254+
expect(queue).to all(have_header_value("jms_destination", destination))
255+
end
256+
end
244257
end
245258

246259
context 'when the message is map message', :ecs_compatibility_support do

spec/inputs/unit/jms_spec.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@
291291
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
292292

293293
plugin.register
294-
plugin.queue_event(jms_message_double, queue = [])
294+
plugin.queue_events(jms_message_double, queue = [])
295295
expect(queue.size).to eql 1
296296
@event = queue.first
297297
end

0 commit comments

Comments
 (0)