22
22
23
23
let ( :input ) { LogStash ::Inputs ::SQS . new ( config ) }
24
24
let ( :decoded_message ) { { "bonjour" => "awesome" } }
25
- let ( :encoded_message ) { double ( "sqs_message" , :body => LogStash ::Json ::dump ( decoded_message ) ) }
25
+ let ( :encoded_message ) { double ( "sqs_message" , :body => LogStash ::Json ::dump ( decoded_message ) ) , :message_attributes => { } }
26
26
27
27
subject { input }
28
28
110
110
111
111
# We have to make sure we create a bunch of events
112
112
# so we actually really try to stop the plugin.
113
- #
114
- # rspec's `and_yield` allow you to define a fix amount of possible
113
+ #
114
+ # rspec's `and_yield` allow you to define a fix amount of possible
115
115
# yielded values and doesn't allow you to create infinite loop.
116
116
# And since we are actually creating thread we need to make sure
117
117
# we have enough work to keep the thread working until we kill it..
@@ -136,14 +136,16 @@ def poll(polling_options = {})
136
136
let ( :md5_of_body ) { "dr strange" }
137
137
let ( :sent_timestamp ) { LogStash ::Timestamp . new }
138
138
let ( :epoch_timestamp ) { ( sent_timestamp . utc . to_f * 1000 ) . to_i }
139
+ let ( :message_attributes ) { :some_metadata => "add this" }
139
140
140
141
let ( :id_field ) { "my_id_field" }
141
142
let ( :md5_field ) { "my_md5_field" }
142
143
let ( :sent_timestamp_field ) { "my_sent_timestamp_field" }
144
+ let ( :user_attributes_field ) { "my_user_attributes_field" }
143
145
144
146
let ( :message ) do
145
- double ( "message" , :message_id => message_id , :md5_of_body => md5_of_body , :attributes => { LogStash ::Inputs ::SQS ::SENT_TIMESTAMP => epoch_timestamp } )
146
- end
147
+ double ( "message" , :message_id => message_id , :md5_of_body => md5_of_body , :attributes => { LogStash ::Inputs ::SQS ::SENT_TIMESTAMP => epoch_timestamp } , :message_attributes => message_attributes )
148
+ end
147
149
148
150
subject { input . add_sqs_data ( event , message ) }
149
151
@@ -156,7 +158,8 @@ def poll(polling_options = {})
156
158
"queue" => queue_name ,
157
159
"id_field" => id_field ,
158
160
"md5_field" => md5_field ,
159
- "sent_timestamp_field" => sent_timestamp_field
161
+ "sent_timestamp_field" => sent_timestamp_field ,
162
+ "user_attributes_field" => user_attributes_field
160
163
}
161
164
end
162
165
@@ -198,7 +201,7 @@ def poll(polling_options = {})
198
201
end
199
202
200
203
context "receiving messages" do
201
- before do
204
+ before do
202
205
expect ( subject ) . to receive ( :poller ) . and_return ( mock_sqs ) . at_least ( :once )
203
206
end
204
207
@@ -235,7 +238,7 @@ def poll(polling_options = {})
235
238
it "retry to fetch messages" do
236
239
# change the poller implementation to raise SQS errors.
237
240
had_error = false
238
-
241
+
239
242
# actually using the child of `Object` to do an expectation of `#sleep`
240
243
expect ( subject ) . to receive ( :sleep ) . with ( LogStash ::Inputs ::SQS ::BACKOFF_SLEEP_TIME )
241
244
expect ( mock_sqs ) . to receive ( :poll ) . with ( anything ( ) ) . at_most ( 2 ) do
0 commit comments