Skip to content

Commit 397921d

Browse files
kaisechengyaauie
andauthored
Add support for pipeline to decode from metadata target_ingest_pipeline (#1113)
This commit allows `pipeline` option to take value from [@metadata][target_ingest_pipeline] if `pipeline` is not set. If [@metadata][target_ingest_pipeline] gives empty string, the pipeline won't be set Fixes: #1112 Co-authored-by: Ry Biesemeyer <[email protected]>
1 parent 815594d commit 397921d

File tree

5 files changed

+80
-16
lines changed

5 files changed

+80
-16
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.13.0
2+
- add technology preview support for allowing events to individually encode a default pipeline with `[@metadata][target_ingest_pipeline]` (as part of a technology preview, this feature may change without notice) [#1113](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1113)
3+
14
## 11.12.4
25
- Changed the `manage_template` default value to `false` when data streams is enabled [#1111](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1111)
36
- Added the `manage_template => false` as a valid data stream option

docs/index.asciidoc

+3-4
Original file line numberDiff line numberDiff line change
@@ -849,12 +849,11 @@ not also set this field. That will raise an error at startup
849849
===== `pipeline`
850850

851851
* Value type is <<string,string>>
852-
* Default value is `nil`
852+
* There is no default value.
853853

854854
Set which ingest pipeline you wish to execute for an event. You can also use
855-
event dependent configuration here like `pipeline =>
856-
"%{[@metadata][pipeline]}"`. The pipeline parameter won't be set if the value
857-
resolves to empty string ("").
855+
event dependent configuration here like `pipeline => "%{[@metadata][pipeline]}"`.
856+
The pipeline parameter won't be set if the value resolves to empty string ("").
858857

859858
[id="plugins-{type}s-{plugin}-pool_max"]
860859
===== `pool_max`

lib/logstash/outputs/elasticsearch.rb

+12-9
Original file line numberDiff line numberDiff line change
@@ -524,19 +524,22 @@ def common_event_params(event)
524524
routing_field_name => @routing ? event.sprintf(@routing) : nil
525525
}
526526

527-
if @pipeline
528-
value = event.sprintf(@pipeline)
529-
# convention: empty string equates to not using a pipeline
530-
# this is useful when using a field reference in the pipeline setting, e.g.
531-
# elasticsearch {
532-
# pipeline => "%{[@metadata][pipeline]}"
533-
# }
534-
params[:pipeline] = value unless value.empty?
535-
end
527+
target_pipeline = resolve_pipeline(event)
528+
# convention: empty string equates to not using a pipeline
529+
# this is useful when using a field reference in the pipeline setting, e.g.
530+
# elasticsearch {
531+
# pipeline => "%{[@metadata][pipeline]}"
532+
# }
533+
params[:pipeline] = target_pipeline unless (target_pipeline.nil? || target_pipeline.empty?)
536534

537535
params
538536
end
539537

538+
def resolve_pipeline(event)
539+
pipeline_template = @pipeline || event.get("[@metadata][target_ingest_pipeline]")&.to_s
540+
pipeline_template && event.sprintf(pipeline_template)
541+
end
542+
540543
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }
541544

542545
@@plugins.each do |plugin|

logstash-output-elasticsearch.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.12.4'
3+
s.version = '11.13.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/unit/outputs/elasticsearch_spec.rb

+61-2
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@
590590

591591
let(:event) { LogStash::Event.new("pipeline" => "my-ingest-pipeline") }
592592

593-
it "should interpolate the pipeline value and set it" do
593+
it "interpolate the pipeline value and set it" do
594594
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline")
595595
end
596596
end
@@ -600,7 +600,66 @@
600600

601601
let(:event) { LogStash::Event.new("pipeline" => "") }
602602

603-
it "should interpolate the pipeline value but not set it because it is empty" do
603+
it "interpolates the pipeline value but not set it because it is empty" do
604+
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
605+
end
606+
end
607+
608+
context "with both pipeline and target_ingest_pipeline" do
609+
let(:options) { {"pipeline" => "%{pipeline}" } }
610+
let(:event) { LogStash::Event.new({"pipeline" => "my-ingest-pipeline", "[@metadata][target_ingest_pipeline]" => "meta-ingest-pipeline"}) }
611+
612+
it "interpolates the plugin's pipeline value" do
613+
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-ingest-pipeline")
614+
end
615+
616+
context "when the plugin's `pipeline` is constant" do
617+
let(:options) { super().merge("pipeline" => "my-constant-pipeline") }
618+
it "uses plugin's pipeline value" do
619+
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "my-constant-pipeline")
620+
end
621+
end
622+
623+
context "when the plugin's `pipeline` includes an unresolvable sprintf placeholder" do
624+
let(:options) { super().merge("pipeline" => "reference-%{unset}-field") }
625+
it "does not use the target_ingest_pipeline" do
626+
# when sprintf doesn't resolve a placeholder, the behaviour of our `pipeline` is UNSPECIFIED.
627+
# here we only validate that the presence of the magic field does not
628+
# override an explicitly-configured pipeline.
629+
expect(subject.send(:event_action_tuple, event)[1]).to_not include(:pipeline => "my-ingest-pipeline")
630+
end
631+
end
632+
end
633+
634+
context "with empty pipeline and target_ingest_pipeline" do
635+
let(:options) { {"pipeline" => "%{pipeline}" } }
636+
let(:event) { LogStash::Event.new({"pipeline" => "", "[@metadata][target_ingest_pipeline]" => "meta-ingest-pipeline"}) }
637+
638+
it "interpolates the pipeline value but not set it because pipeline is empty" do
639+
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
640+
end
641+
end
642+
643+
context "with target_ingest_pipeline" do
644+
let(:event) { LogStash::Event.new({"pipeline" => "", "@metadata" => {"target_ingest_pipeline" => "meta-ingest-pipeline"}}) }
645+
646+
it "interpolates the target_ingest_pipeline value and set it" do
647+
expect(subject.send(:event_action_tuple, event)[1]).to include(:pipeline => "meta-ingest-pipeline")
648+
end
649+
end
650+
651+
context "with empty target_ingest_pipeline" do
652+
let(:event) { LogStash::Event.new({"pipeline" => "", "@metadata" => {"host" => "elastic"}}) }
653+
654+
it "does not set pipeline" do
655+
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
656+
end
657+
end
658+
659+
context "with empty pipeline and empty target_ingest_pipeline" do
660+
let(:event) { LogStash::Event.new }
661+
662+
it "does not set pipeline" do
604663
expect(subject.send(:event_action_tuple, event)[1]).not_to include(:pipeline)
605664
end
606665
end

0 commit comments

Comments
 (0)