Skip to content

Commit bd67dab

Browse files
committed
ES|QL support: ESQL executor implementation, response type to accept esql option, validations to make sure both LS and ES support the ESQL execution.
1 parent 5abd4ca commit bd67dab

File tree

4 files changed

+201
-26
lines changed

4 files changed

+201
-26
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.2.0
2+
- ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233)
3+
14
## 5.1.0
25
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)
36

lib/logstash/inputs/elasticsearch.rb

+47-4
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,17 @@ def register
285285
fill_hosts_from_cloud_id
286286
setup_ssl_params!
287287

288-
@base_query = LogStash::Json.load(@query)
289-
if @slices
290-
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
291-
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
288+
if @response_type == 'esql'
289+
validate_ls_version_for_esql_support!
290+
validate_esql_query!
291+
inform_ineffective_esql_params
292+
else
293+
# for the ES|QL, plugin accepts raw string query but JSON for others
294+
@base_query = LogStash::Json.load(@query)
295+
if @slices
296+
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
297+
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
298+
end
292299
end
293300

294301
@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
@@ -324,6 +331,9 @@ def register
324331

325332
test_connection!
326333

334+
# make sure connected ES supports ES|QL (8.11+)
335+
validate_es_for_esql_support! if @response_type == 'esql'
336+
327337
setup_serverless
328338

329339
setup_search_api
@@ -362,6 +372,12 @@ def event_from_hit(hit, root_field)
362372
return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
363373
end
364374

375+
def decorate_and_push_to_queue(output_queue, mapped_entry)
376+
event = targeted_event_factory.new_event mapped_entry
377+
decorate(event)
378+
output_queue << event
379+
end
380+
365381
def set_docinfo_fields(hit, event)
366382
# do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
367383
docinfo_target = event.get(@docinfo_target) || {}
@@ -639,6 +655,8 @@ def setup_query_executor
639655
end
640656
when 'aggregations'
641657
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
658+
when 'esql'
659+
LogStash::Inputs::Elasticsearch::Esql.new(@client, self)
642660
end
643661
end
644662

@@ -656,6 +674,31 @@ def get_transport_client_class
656674
::Elastic::Transport::Transport::HTTP::Manticore
657675
end
658676

677+
def validate_ls_version_for_esql_support!
678+
# LS 8.17.4+ has elasticsearch-ruby 8.17 client
679+
# elasticsearch-ruby 8.11+ supports ES|QL
680+
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create("8.17.4")
681+
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4")
682+
end
683+
end
684+
685+
def validate_esql_query!
686+
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
687+
source_commands = %w[FROM ROW SHOW]
688+
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
689+
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
690+
end
691+
692+
def inform_ineffective_esql_params
693+
ineffective_options = original_params.keys & %w(target size slices search_api)
694+
@logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1
695+
end
696+
697+
def validate_es_for_esql_support!
698+
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create("8.11")
699+
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. Please upgrade it.") unless es_supports_esql
700+
end
701+
659702
module URIOrEmptyValidator
660703
##
661704
# @override to provide :uri_or_empty validator

lib/logstash/inputs/elasticsearch/esql.rb

+36-22
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,21 @@ class Esql
88

99
ESQL_JOB = "ES|QL job"
1010

11+
# Initialize the ESQL query executor
12+
# @param client [Elasticsearch::Client] The Elasticsearch client instance
13+
# @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance
1114
def initialize(client, plugin)
1215
@client = client
1316
@plugin_params = plugin.params
1417
@plugin = plugin
1518
@retries = @plugin_params["retries"]
16-
1719
@query = @plugin_params["query"]
18-
esql_options = @plugin_params["esql"] ? @plugin_params["esql"]: {}
19-
@esql_params = esql_options["params"] ? esql_options["params"] : {}
20-
# TODO: add filter as well
21-
# @esql_params = esql_options["filter"] | []
2220
end
2321

22+
# Execute a retryable operation with proper error handling
23+
# @param job_name [String] Name of the job for logging purposes
24+
# @yield The block to execute
25+
# @return [Boolean] true if successful, false otherwise
2426
def retryable(job_name, &block)
2527
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
2628
stud_try.try((@retries + 1).times) { yield }
@@ -31,35 +33,47 @@ def retryable(job_name, &block)
3133
false
3234
end
3335

36+
# Execute the ESQL query and process results
37+
# @param output_queue [Queue] The queue to push processed events to
3438
def do_run(output_queue)
3539
logger.info("ES|QL executor starting")
3640
response = retryable(ESQL_JOB) do
3741
@client.esql.query({ body: { query: @query }, format: 'json' })
38-
# TODO: make sure to add params, filters, etc...
39-
# @client.esql.query({ body: { query: @query }, format: 'json' }.merge(@esql_params))
42+
end
43+
# retriable already printed error details
44+
return if response == false
4045

46+
puts "response class: #{response.class}"
47+
puts "response: #{response.inspect}"
48+
unless response&.headers&.dig("warning")
49+
logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]})
50+
end
51+
if response['values'] && response['columns']
52+
process_response(response['values'], response['columns'], output_queue)
4153
end
42-
puts "Response: #{response.inspect}"
43-
if response && response['values']
44-
response['values'].each do |value|
45-
mapped_data = map_column_and_values(response['columns'], value)
46-
puts "Mapped Data: #{mapped_data}"
47-
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
48-
end
54+
end
55+
56+
private
57+
58+
# Process the ESQL response and push events to the output queue
59+
# @param values [Array[Array]] The ESQL query response hits
60+
# @param columns [Array[Hash]] The ESQL query response columns
61+
# @param output_queue [Queue] The queue to push processed events to
62+
def process_response(values, columns, output_queue)
63+
values.each do |value|
64+
mapped_data = map_column_and_values(columns, value)
65+
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
4966
end
5067
end
5168

69+
# Map column names to their corresponding values
70+
# @param columns [Array] Array of column definitions
71+
# @param values [Array] Array of values for the current row
72+
# @return [Hash] Mapped data with column names as keys
5273
def map_column_and_values(columns, values)
53-
puts "columns class: #{columns.class}"
54-
puts "values class: #{values.class}"
55-
puts "columns: #{columns.inspect}"
56-
puts "values: #{values.inspect}"
57-
mapped_data = {}
58-
columns.each_with_index do |column, index|
74+
columns.each_with_index.with_object({}) do |(column, index), mapped_data|
5975
mapped_data[column["name"]] = values[index]
6076
end
61-
puts "values: #{mapped_data.inspect}"
62-
mapped_data
6377
end
6478
end
6579
end
+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# encoding: utf-8
2+
require "logstash/devutils/rspec/spec_helper"
3+
require "logstash/inputs/elasticsearch"
4+
require "elasticsearch"
5+
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'
6+
7+
describe LogStash::Inputs::Elasticsearch, :ecs_compatibility_support do
8+
let(:plugin) { described_class.new(config) }
9+
let(:client) { instance_double(Elasticsearch::Client) }
10+
let(:queue) { Queue.new }
11+
let(:cluster_info) { { "version" => { "number" => "8.11.0", "build_flavor" => "default" } } }
12+
13+
let(:config) do
14+
{
15+
"query" => "FROM test-index | STATS count() BY field",
16+
"response_type" => "esql",
17+
"retries" => 3
18+
}
19+
end
20+
21+
describe "#initialize" do
22+
it "sets up the ESQL client with correct parameters" do
23+
expect(plugin.instance_variable_get(:@query)).to eq(config["query"])
24+
expect(plugin.instance_variable_get(:@response_type)).to eq(config["response_type"])
25+
expect(plugin.instance_variable_get(:@retries)).to eq(config["retries"])
26+
end
27+
end
28+
29+
describe "#register" do
30+
before(:each) do
31+
Elasticsearch::Client.send(:define_method, :ping) { }
32+
allow_any_instance_of(Elasticsearch::Client).to receive(:info).and_return(cluster_info)
33+
end
34+
it "creates ES|QL executor" do
35+
plugin.register
36+
expect(plugin.instance_variable_get(:@query_executor)).to be_an_instance_of(LogStash::Inputs::Elasticsearch::Esql)
37+
end
38+
end
39+
40+
describe "#validation" do
41+
42+
describe "LS version" do
43+
context "when compatible" do
44+
before(:each) do
45+
stub_const("LogStash::VERSION", "8.11.0")
46+
end
47+
48+
it "does not raise an error" do
49+
expect { plugin.send(:validate_ls_version_for_esql_support!) }.not_to raise_error
50+
end
51+
end
52+
53+
context "when incompatible" do
54+
before(:each) do
55+
stub_const("LOGSTASH_VERSION", "8.10.0")
56+
end
57+
58+
it "raises a runtime error" do
59+
expect { plugin.send(:validate_ls_version_for_esql_support!) }
60+
.to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4/)
61+
end
62+
end
63+
end
64+
65+
describe "ES version" do
66+
before(:each) do
67+
allow(plugin).to receive(:es_version).and_return("8.10.5")
68+
end
69+
70+
context "when incompatible" do
71+
it "raises a runtime error" do
72+
expect { plugin.send(:validate_es_for_esql_support!) }
73+
.to raise_error(RuntimeError, /Connected Elasticsearch 8.10.5 version does not supports ES|QL. Please upgrade it./)
74+
end
75+
end
76+
end
77+
78+
describe "ES|QL query" do
79+
context "when query is valid" do
80+
it "does not raise an error" do
81+
expect { plugin.send(:validate_esql_query!) }.not_to raise_error
82+
end
83+
end
84+
85+
context "when query is empty" do
86+
let(:config) do
87+
{
88+
"query" => " "
89+
}
90+
end
91+
92+
# TODO: make shared spec
93+
it "raises a configuration error" do
94+
expect { plugin.send(:validate_esql_query!) }
95+
.to raise_error(LogStash::ConfigurationError, /`query` cannot be empty/)
96+
end
97+
end
98+
99+
context "when query doesn't align with ES syntax" do
100+
let(:config) do
101+
{
102+
"query" => "RANDOM query"
103+
}
104+
end
105+
106+
it "raises a configuration error" do
107+
source_commands = %w[FROM ROW SHOW]
108+
expect { plugin.send(:validate_esql_query!) }
109+
.to raise_error(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}")
110+
end
111+
end
112+
end
113+
end
114+
115+
end

0 commit comments

Comments
 (0)