-
Notifications
You must be signed in to change notification settings - Fork 84
/
Copy pathelasticsearch.rb
218 lines (180 loc) · 7.05 KB
/
elasticsearch.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "base64"
# .Compatibility Note
# [NOTE]
# ================================================================================
# Starting with Elasticsearch 5.3, there's an {ref}modules-http.html[HTTP setting]
# called `http.content_type.required`. If this option is set to `true`, and you
# are using Logstash 2.4 through 5.2, you need to update the Elasticsearch input
# plugin to version 4.0.2 or higher.
#
# ================================================================================
#
# Read from an Elasticsearch cluster, based on search query results.
# This is useful for replaying test logs, reindexing, etc.
#
# Example:
# [source,ruby]
# input {
# # Read all documents from Elasticsearch matching the given query
# elasticsearch {
# hosts => "localhost"
# query => '{ "query": { "match": { "statuscode": 200 } }, "sort": [ "_doc" ] }'
# }
# }
#
# This would create an Elasticsearch query with the following format:
# [source,json]
# curl 'http://localhost:9200/logstash-*/_search?&scroll=1m&size=1000' -d '{
# "query": {
# "match": {
# "statuscode": 200
# }
# },
# "sort": [ "_doc" ]
# }'
#
class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
config_name "elasticsearch"
default :codec, "json"
# List of elasticsearch hosts to use for querying.
# Each host can be either IP, HOST, IP:port or HOST:port.
# Port defaults to 9200
config :hosts, :validate => :array
# The index or alias to search.
config :index, :validate => :string, :default => "logstash-*"
# The query to be executed. Read the Elasticsearch query DSL documentation
# for more info
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'
# This allows you to set the maximum number of hits returned per scroll.
config :size, :validate => :number, :default => 1000
# This parameter controls the keepalive time in seconds of the scrolling
# request and initiates the scrolling process. The timeout applies per
# round trip (i.e. between the previous scroll request, to the next).
config :scroll, :validate => :string, :default => "1m"
# If set, include Elasticsearch document information such as index, type, and
# the id in the event.
#
# It might be important to note, with regards to metadata, that if you're
# ingesting documents with the intent to re-index them (or just update them)
# that the `action` option in the elasticsearch output wants to know how to
# handle those things. It can be dynamically assigned with a field
# added to the metadata.
#
# Example
# [source, ruby]
# input {
# elasticsearch {
# hosts => "es.production.mysite.org"
# index => "mydata-2018.09.*"
# query => "*"
# size => 500
# scroll => "5m"
# docinfo => true
# }
# }
# output {
# elasticsearch {
# index => "copy-of-production.%{[@metadata][_index]}"
# document_type => "%{[@metadata][_type]}"
# document_id => "%{[@metadata][_id]}"
# }
# }
#
config :docinfo, :validate => :boolean, :default => false
# Where to move the Elasticsearch document information. By default we use the @metadata field.
config :docinfo_target, :validate=> :string, :default => LogStash::Event::METADATA
# List of document metadata to move to the `docinfo_target` field.
# To learn more about Elasticsearch metadata fields read
# http://www.elasticsearch.org/guide/en/elasticsearch/guide/current/_document_metadata.html
config :docinfo_fields, :validate => :array, :default => ['_index', '_type', '_id']
# Basic Auth - username
config :user, :validate => :string
# Basic Auth - password
config :password, :validate => :password
# SSL
config :ssl, :validate => :boolean, :default => false
# SSL verify certificate
config :ssl_verify, :validate => :boolean, :default => true
# SSL Certificate Authority file in PEM encoded format, must also include any chain certificates as necessary
config :ca_file, :validate => :path
# SSL Client Certificate file in PEM encoded format
config :client_cert_file, :validate => :path
# SSL Client Key file in PEM encoded format
config :client_key_file, :validate => :path
def register
require "elasticsearch"
@options = {
:index => @index,
:body => @query,
:scroll => @scroll,
:size => @size
}
transport_options = {}
if @user && @password
token = Base64.strict_encode64("#{@user}:#{@password.value}")
transport_options[:headers] = { :Authorization => "Basic #{token}" }
end
hosts = if @ssl then
@hosts.map do |h|
host, port = h.split(":")
{ :host => host, :scheme => 'https', :port => port }
end
else
@hosts
end
if @ssl
transport_options[:ssl] = {:verify => @ssl_verify}
if @ca_file
transport_options[:ssl][:ca_file] = @ca_file
end
if @client_cert_file
transport_options[:ssl][:client_cert] = OpenSSL::X509::Certificate.new(File.read(@client_cert_file))
end
if @client_key_file
transport_options[:ssl][:client_key] = OpenSSL::PKey::RSA.new(File.read(@client_key_file))
end
end
@client = Elasticsearch::Client.new(:hosts => hosts, :transport_options => transport_options)
end
def run(output_queue)
# get first wave of data
r = @client.search(@options)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
has_hits = r['hits']['hits'].any?
while has_hits && !stop?
r = process_next_scroll(output_queue, r['_scroll_id'])
has_hits = r['has_hits']
end
end
private
def process_next_scroll(output_queue, scroll_id)
r = scroll_request(scroll_id)
r['hits']['hits'].each { |hit| push_hit(hit, output_queue) }
{'has_hits' => r['hits']['hits'].any?, '_scroll_id' => r['_scroll_id']}
end
def push_hit(hit, output_queue)
event = LogStash::Event.new(hit['_source'])
decorate(event)
if @docinfo
# do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
docinfo_target = event.get(@docinfo_target) || {}
unless docinfo_target.is_a?(Hash)
@logger.error("Elasticsearch Input: Incompatible Event, incompatible type for the docinfo_target=#{@docinfo_target} field in the `_source` document, expected a hash got:", :docinfo_target_type => docinfo_target.class, :event => event)
# TODO: (colin) I am not sure raising is a good strategy here?
raise Exception.new("Elasticsearch input: incompatible event")
end
@docinfo_fields.each do |field|
docinfo_target[field] = hit[field]
end
event.set(@docinfo_target, docinfo_target)
end
output_queue << event
end
def scroll_request scroll_id
@client.scroll(:body => { :scroll_id => scroll_id }, :scroll => @scroll)
end
end