-
Notifications
You must be signed in to change notification settings - Fork 80
/
Copy pathhttp.rb
395 lines (327 loc) · 11 KB
/
http.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "logstash/json"
require "uri"
require "logstash/plugin_mixins/http_client"
require "zlib"
class LogStash::Outputs::Http < LogStash::Outputs::Base
include LogStash::PluginMixins::HttpClient
concurrency :shared
attr_accessor :is_batch
VALID_METHODS = ["put", "post", "patch", "delete", "get", "head"]
RETRYABLE_MANTICORE_EXCEPTIONS = [
::Manticore::Timeout,
::Manticore::SocketException,
::Manticore::ClientProtocolException,
::Manticore::ResolutionFailure,
::Manticore::SocketTimeout
]
# This output lets you send events to a
# generic HTTP(S) endpoint
#
# This output will execute up to 'pool_max' requests in parallel for performance.
# Consider this when tuning this plugin for performance.
#
# Additionally, note that when parallel execution is used strict ordering of events is not
# guaranteed!
#
# Beware, this gem does not yet support codecs. Please use the 'format' option for now.
config_name "http"
# URL to use
config :url, :validate => :string, :required => :true
# The HTTP Verb. One of "put", "post", "patch", "delete", "get", "head"
config :http_method, :validate => VALID_METHODS, :required => :true
# Custom headers to use
# format is `headers => ["X-My-Header", "%{host}"]`
config :headers, :validate => :hash, :default => {}
# Content type
#
# If not specified, this defaults to the following:
#
# * if format is "json", "application/json"
# * if format is "form", "application/x-www-form-urlencoded"
config :content_type, :validate => :string
# Set this to false if you don't want this output to retry failed requests
config :retry_failed, :validate => :boolean, :default => true
# If encountered as response codes this plugin will retry these requests
config :retryable_codes, :validate => :number, :list => true, :default => [429, 500, 502, 503, 504]
# If you would like to consider some non-2xx codes to be successes
# enumerate them here. Responses returning these codes will be considered successes
config :ignorable_codes, :validate => :number, :list => true
# This lets you choose the structure and parts of the event that are sent.
#
#
# For example:
# [source,ruby]
# mapping => {"foo" => "%{host}"
# "bar" => "%{type}"}
config :mapping, :validate => :hash
# Set the format of the http body.
#
# If form, then the body will be the mapping (or whole event) converted
# into a query parameter string, e.g. `foo=bar&baz=fizz...`
#
# If message, then the body will be the result of formatting the event according to message
#
# Otherwise, the event is sent as json.
config :format, :validate => ["json", "json_batch", "json_lines", "form", "message"], :default => "json"
# Set this to true if you want format url with batch body. Formating url is done by first element in batch
config :batch_url_format, :validate => :boolean, :default => false
# Set this to true if you want to enable gzip compression for your http requests
config :http_compression, :validate => :boolean, :default => false
config :message, :validate => :string
def register
@http_method = @http_method.to_sym
# We count outstanding requests with this queue
# This queue tracks the requests to create backpressure
# When this queue is empty no new requests may be sent,
# tokens must be added back by the client on success
@request_tokens = SizedQueue.new(@pool_max)
@pool_max.times {|t| @request_tokens << true }
@requests = Array.new
if @content_type.nil?
case @format
when "form" ; @content_type = "application/x-www-form-urlencoded"
when "json" ; @content_type = "application/json"
when "json_batch" ; @content_type = "application/json"
when "json_lines"; @content_type = "application/json_lines"
when "message" ; @content_type = "text/plain"
end
end
@is_batch = @format == "json_batch" || @format == "json_lines"
@headers["Content-Type"] = @content_type
validate_format!
# Run named Timer as daemon thread
@timer = java.util.Timer.new("HTTP Output #{self.params['id']}", true)
end # def register
def multi_receive(events)
return if events.empty?
send_events(events)
end
class RetryTimerTask < java.util.TimerTask
def initialize(pending, event, attempt)
@pending = pending
@event = event
@attempt = attempt
super()
end
def run
@pending << [@event, @attempt]
end
end
def log_retryable_response(response)
if (response.code == 429)
@logger.debug? && @logger.debug("Encountered a 429 response, will retry. This is not serious, just flow control via HTTP")
else
@logger.warn("Encountered a retryable HTTP request in HTTP output, will retry", :code => response.code, :body => response.body)
end
end
def log_error_response(response, url, event)
log_failure(
"Encountered non-2xx HTTP code #{response.code}",
:response_code => response.code,
:url => url,
:event => event
)
end
def send_events(events)
successes = java.util.concurrent.atomic.AtomicInteger.new(0)
failures = java.util.concurrent.atomic.AtomicInteger.new(0)
retries = java.util.concurrent.atomic.AtomicInteger.new(0)
event_count = @is_batch ? 1 : events.size
pending = Queue.new
if @is_batch
pending << [events, 0]
else
events.each {|e| pending << [e, 0]}
end
while popped = pending.pop
break if popped == :done
event, attempt = popped
action, event, attempt = send_event(event, attempt)
begin
action = :failure if action == :retry && !@retry_failed
case action
when :success
successes.incrementAndGet
when :retry
retries.incrementAndGet
next_attempt = attempt+1
sleep_for = sleep_for_attempt(next_attempt)
@logger.info("Retrying http request, will sleep for #{sleep_for} seconds")
timer_task = RetryTimerTask.new(pending, event, next_attempt)
@timer.schedule(timer_task, sleep_for*1000)
when :failure
failures.incrementAndGet
else
raise "Unknown action #{action}"
end
if action == :success || action == :failure
if successes.get+failures.get == event_count
pending << :done
end
end
rescue => e
# This should never happen unless there's a flat out bug in the code
@logger.error("Error sending HTTP Request",
:class => e.class.name,
:message => e.message,
:backtrace => e.backtrace)
failures.incrementAndGet
raise e
end
end
rescue => e
@logger.error("Error in http output loop",
:class => e.class.name,
:message => e.message,
:backtrace => e.backtrace)
raise e
end
def sleep_for_attempt(attempt)
sleep_for = attempt**2
sleep_for = sleep_for <= 60 ? sleep_for : 60
(sleep_for/2) + (rand(0..sleep_for)/2)
end
def send_event(event, attempt)
body = event_body(event)
# Send the request
# Format url
if @is_batch
url = @batch_url_format ? event[0].sprintf(@url) : @url
else
url = event.sprintf(@url)
end
headers = @is_batch ? @headers : event_headers(event)
# Compress the body and add appropriate header
if @http_compression == true
headers["Content-Encoding"] = "gzip"
body = gzip(body)
end
# Create an async request
response = client.send(@http_method, url, :body => body, :headers => headers).call
if !response_success?(response)
if retryable_response?(response)
log_retryable_response(response)
return :retry, event, attempt
else
log_error_response(response, url, event)
return :failure, event, attempt
end
else
return :success, event, attempt
end
rescue => exception
will_retry = retryable_exception?(exception)
log_failure("Could not fetch URL",
:url => url,
:method => @http_method,
:body => body,
:headers => headers,
:message => exception.message,
:class => exception.class.name,
:backtrace => exception.backtrace,
:will_retry => will_retry
)
if will_retry
return :retry, event, attempt
else
return :failure, event, attempt
end
end
def close
@timer.cancel
client.close
end
private
def response_success?(response)
code = response.code
return true if @ignorable_codes && @ignorable_codes.include?(code)
return code >= 200 && code <= 299
end
def retryable_response?(response)
@retryable_codes && @retryable_codes.include?(response.code)
end
def retryable_exception?(exception)
RETRYABLE_MANTICORE_EXCEPTIONS.any? {|me| exception.is_a?(me) }
end
# This is split into a separate method mostly to help testing
def log_failure(message, opts)
@logger.error("[HTTP Output Failure] #{message}", opts)
end
# Format the HTTP body
def event_body(event)
# TODO: Create an HTTP post data codec, use that here
if @format == "json"
LogStash::Json.dump(map_event(event))
elsif @format == "message"
event.sprintf(@message)
elsif @format == "json_batch"
LogStash::Json.dump(event.map {|e| map_event(e) })
elsif @format == "json_lines"
(event.map { |e| LogStash::Json.dump(map_event(e)) }).join("\n")
else
encode(map_event(event))
end
end
# gzip data
def gzip(data)
gz = StringIO.new
gz.set_encoding("BINARY")
z = Zlib::GzipWriter.new(gz)
z.write(data)
z.close
gz.string
end
def convert_mapping(mapping, event)
if mapping.is_a?(Hash)
mapping.reduce({}) do |acc, kv|
k, v = kv
acc[k] = convert_mapping(v, event)
acc
end
elsif mapping.is_a?(Array)
mapping.map { |elem| convert_mapping(elem, event) }
else
event.sprintf(mapping)
end
end
def map_event(event)
if @mapping
convert_mapping(@mapping, event)
else
event.to_hash
end
end
def event_headers(event)
custom_headers(event) || {}
end
def custom_headers(event)
return nil unless @headers
@headers.reduce({}) do |acc,kv|
k,v = kv
acc[k] = event.sprintf(v)
acc
end
end
#TODO Extract this to a codec
def encode(hash)
return hash.collect do |key, value|
CGI.escape(key) + "=" + CGI.escape(value.to_s)
end.join("&")
end
def validate_format!
if @format == "message"
if @message.nil?
raise "message must be set if message format is used"
end
if @content_type.nil?
raise "content_type must be set if message format is used"
end
unless @mapping.nil?
@logger.warn "mapping is not supported and will be ignored if message format is used"
end
end
end
end