Skip to content

Commit e82b2aa

Browse files
committed
✨ Add use_metadata option
1 parent 7f1099e commit e82b2aa

File tree

5 files changed

+172
-2
lines changed

5 files changed

+172
-2
lines changed

Diff for: docs/index.asciidoc

+23
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ This plugin supports the following configuration options plus the
367367
| <<plugins-{type}s-{plugin}-truststore_password>> |<<password,password>>|No
368368
| <<plugins-{type}s-{plugin}-upsert>> |<<string,string>>|No
369369
| <<plugins-{type}s-{plugin}-user>> |<<string,string>>|No
370+
| <<plugins-{type}s-{plugin}-use_metadata>> |<<boolean,boolean>>|No
370371
| <<plugins-{type}s-{plugin}-validate_after_inactivity>> |<<number,number>>|No
371372
| <<plugins-{type}s-{plugin}-version>> |<<string,string>>|No
372373
| <<plugins-{type}s-{plugin}-version_type>> |<<string,string>>, one of `["internal", "external", "external_gt", "external_gte", "force"]`|No
@@ -1095,6 +1096,28 @@ Create a new document with this parameter as json string if `document_id` doesn'
10951096

10961097
Username to authenticate to a secure Elasticsearch cluster
10971098

1099+
[id="plugins-{type}s-{plugin}-use_metadata"]
1100+
===== `use_metadata`
1101+
1102+
* Value type is <<boolean,boolean>>
1103+
* Default value is `false`
1104+
1105+
Use and preference output parameters defined in the document metadata. The <<plugins-{type}s-{plugin}-index>> (`@metadata._index`), <<plugins-{type}s-{plugin}-document_id>> (`@metadata._id_`), and <<plugins-{type}s-{plugin}-pipeline>> (`@metadata.pipeline`) can be set by their respective `@metadata` fields.
1106+
1107+
E.g. to index a document to index `myindex` with id `myid` with the ingest pipeline `mypipeline`:
1108+
1109+
[source,json]
1110+
-----
1111+
{
1112+
"message": "foo",
1113+
"@metadata": {
1114+
"_index": "myindex",
1115+
"_id": "myid",
1116+
"pipeline": "mypipeline"
1117+
}
1118+
}
1119+
-----
1120+
10981121
[id="plugins-{type}s-{plugin}-validate_after_inactivity"]
10991122
===== `validate_after_inactivity`
11001123

Diff for: lib/logstash/outputs/elasticsearch.rb

+11
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
251251
# ILM policy to use, if undefined the default policy will be used.
252252
config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY
253253

254+
# ILM policy to use, if undefined the default policy will be used.
255+
config :use_metadata, :validate => :boolean, :default => false
256+
254257
attr_reader :client
255258
attr_reader :default_index
256259
attr_reader :default_ilm_rollover_alias
@@ -428,6 +431,14 @@ def common_event_params(event)
428431
params[:pipeline] = value unless value.empty?
429432
end
430433

434+
if @use_metadata
435+
logger.debug("Original params", params: params)
436+
params[:_id] = event.get("[@metadata][_id]") || params[:_id]
437+
params[:_index] = event.sprintf(event.get("[@metadata][_index]")) if !@ilm_enabled && event.get("[@metadata][_index]")
438+
params[:pipeline] = event.sprintf(event.get("[@metadata][pipeline]")) if event.get("[@metadata][pipeline]")
439+
logger.debug("@metadata params", params: params)
440+
end
441+
431442
params
432443
end
433444

Diff for: spec/integration/outputs/index_spec.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
end
4747

4848
describe "indexing" do
49-
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type) }
49+
let(:event) { LogStash::Event.new("message" => "Hello World!", "type" => type, "@metadata" => { "_id" => "test-id", "_index" => "test-index", "pipeline" => "test-pipeline" }) }
5050
let(:index) { 10.times.collect { rand(10).to_s }.join("") }
5151
let(:type) { ESHelper.es_version_satisfies?("< 7") ? "doc" : "_doc" }
5252
let(:event_count) { 1 + rand(2) }

Diff for: spec/integration/outputs/index_version_spec.rb

+31
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,36 @@
9494
expect(r2["_source"]["message"]).to eq('foo')
9595
end
9696
end
97+
98+
describe "use metadata" do
99+
let(:settings) do
100+
{
101+
# "manage_template" => false,
102+
"index" => "logstash-index",
103+
# "template_overwrite" => true,
104+
"hosts" => get_host_port(),
105+
# "action" => "index",
106+
# "document_id" => "%{my_id}",
107+
"use_metadata" => true,
108+
}
109+
end
110+
111+
it "should use @metadata._id for document_id" do
112+
id = "new_doc_id_1"
113+
subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id }, "message" => "foo")])
114+
r = es.get(:index => "logstash-index", :type => doc_type, :id => id, :refresh => true)
115+
expect(r["_id"]).to eq(id)
116+
expect(r["_source"]["message"]).to eq("foo")
117+
end
118+
it "should use @metadata._index for index" do
119+
id = "new_doc_id_2"
120+
new_index = "logstash-index-new"
121+
subject.multi_receive([LogStash::Event.new("@metadata" => { "_id" => id, "_index" => new_index }, "message" => "foo")])
122+
r = es.get(:index => new_index, :type => doc_type, :id => id, :refresh => true)
123+
expect(r["_id"]).to eq(id)
124+
expect(r["_index"]).to eq(new_index)
125+
expect(r["_source"]["message"]).to eq("foo")
126+
end
127+
end
97128
end
98129
end

Diff for: spec/integration/outputs/ingest_pipeline_spec.rb

+106-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
end
6161

6262
it "indexes using the proper pipeline" do
63-
results = @es.search(:index => 'logstash-*', :q => "message:\"netcat\"")
63+
results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"")
6464
expect(results).to have_hits(1)
6565
expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200")
6666
expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182")
@@ -72,3 +72,108 @@
7272
expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil)
7373
end
7474
end
75+
76+
describe "Ingest pipeline execution behavior with metadata", :integration => true do
77+
subject! do
78+
require "logstash/outputs/elasticsearch"
79+
settings = {
80+
"hosts" => "#{get_host_port()}",
81+
"pipeline" => "apache-logs",
82+
"data_stream" => "false",
83+
"use_metadata" => true,
84+
}
85+
next LogStash::Outputs::ElasticSearch.new(settings)
86+
end
87+
88+
let(:http_client) { Manticore::Client.new }
89+
let(:ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/apache-logs" }
90+
let(:apache_logs_pipeline) {
91+
'
92+
{
93+
"description" : "Pipeline to parse Apache logs",
94+
"processors" : [
95+
{
96+
"grok": {
97+
"field": "message",
98+
"patterns": ["%{COMBINEDAPACHELOG}"]
99+
}
100+
}
101+
]
102+
}'
103+
}
104+
105+
let(:add_field_ingest_url) { "http://#{get_host_port()}/_ingest/pipeline/add-field" }
106+
let(:add_field_logs_pipeline) {
107+
'
108+
{
109+
"description": "Add field foo with value bar",
110+
"processors": [
111+
{
112+
"set": {
113+
"field": "foo",
114+
"value": "bar"
115+
}
116+
}
117+
]
118+
}'
119+
}
120+
121+
before :each do
122+
# Delete all templates first.
123+
require "elasticsearch"
124+
125+
# Clean ES of data before we start.
126+
@es = get_client
127+
@es.indices.delete_template(:name => "*")
128+
129+
# This can fail if there are no indexes, ignore failure.
130+
@es.indices.delete(:index => "*") rescue nil
131+
132+
# delete existing ingest pipeline
133+
http_client.delete(ingest_url).call
134+
135+
# register pipelines
136+
http_client.put(ingest_url, :body => apache_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call
137+
http_client.put(add_field_ingest_url, :body => add_field_logs_pipeline, :headers => { "Content-Type" => "application/json" }).call
138+
139+
#TODO: Use esclient
140+
#@es.ingest.put_pipeline :id => 'apache_pipeline', :body => pipeline_defintion
141+
142+
subject.register
143+
subject.multi_receive([
144+
LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"'),
145+
LogStash::Event.new("message" => '183.60.215.50 - - [01/Jun/2015:18:00:00 +0000] "GET /scripts/netcat-webserver HTTP/1.1" 200 182 "-" "Mozilla/5.0 (compatible; EasouSpider; +http://www.easou.com/search/spider.html)"', "@metadata" => { "_id" => "id1", "_index" => "index1", "pipeline" => "add-field" }),
146+
])
147+
@es.indices.refresh
148+
149+
#Wait or fail until everything's indexed.
150+
Stud::try(10.times) do
151+
r = @es.search(index: "logstash-*")
152+
expect(r).to have_hits(1)
153+
r = @es.search(index: "index1")
154+
expect(r).to have_hits(1)
155+
sleep(0.1)
156+
end
157+
end
158+
159+
it "indexes using the correct pipeline when @metadata.pipeline not defined" do
160+
results = @es.search(:index => "logstash-*", :q => "message:\"netcat\"")
161+
expect(results).to have_hits(1)
162+
expect(results["hits"]["hits"][0]["_source"]["response"]).to eq("200")
163+
expect(results["hits"]["hits"][0]["_source"]["bytes"]).to eq("182")
164+
expect(results["hits"]["hits"][0]["_source"]["verb"]).to eq("GET")
165+
expect(results["hits"]["hits"][0]["_source"]["request"]).to eq("/scripts/netcat-webserver")
166+
expect(results["hits"]["hits"][0]["_source"]["auth"]).to eq("-")
167+
expect(results["hits"]["hits"][0]["_source"]["ident"]).to eq("-")
168+
expect(results["hits"]["hits"][0]["_source"]["clientip"]).to eq("183.60.215.50")
169+
expect(results["hits"]["hits"][0]["_source"]["junkfieldaaaa"]).to eq(nil)
170+
end
171+
172+
it "indexes using the @metadata._index, @metadata._id, and @metadata.pipeline when defined" do
173+
results = @es.search(:index => "index1", :q => "message:\"netcat\"")
174+
expect(results).to have_hits(1)
175+
expect(results["hits"]["hits"][0]["_id"]).to eq("id1")
176+
expect(results["hits"]["hits"][0]["_index"]).to eq("index1")
177+
expect(results["hits"]["hits"][0]["_source"]["foo"]).to eq("bar")
178+
end
179+
end

0 commit comments

Comments
 (0)