Skip to content

Commit 52a97c2

Browse files
committed
Move ESQL executor retriable method to private. Unit tests are organized in a better way.
1 parent bd67dab commit 52a97c2

File tree

3 files changed

+184
-99
lines changed

3 files changed

+184
-99
lines changed

Diff for: lib/logstash/inputs/elasticsearch/esql.rb

+15-17
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,6 @@ def initialize(client, plugin)
1919
@query = @plugin_params["query"]
2020
end
2121

22-
# Execute a retryable operation with proper error handling
23-
# @param job_name [String] Name of the job for logging purposes
24-
# @yield The block to execute
25-
# @return [Boolean] true if successful, false otherwise
26-
def retryable(job_name, &block)
27-
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
28-
stud_try.try((@retries + 1).times) { yield }
29-
rescue => e
30-
error_details = {:message => e.message, :cause => e.cause}
31-
error_details[:backtrace] = e.backtrace if logger.debug?
32-
logger.error("#{job_name} failed with ", error_details)
33-
false
34-
end
35-
3622
# Execute the ESQL query and process results
3723
# @param output_queue [Queue] The queue to push processed events to
3824
def do_run(output_queue)
@@ -43,16 +29,28 @@ def do_run(output_queue)
4329
# retriable already printed error details
4430
return if response == false
4531

46-
puts "response class: #{response.class}"
47-
puts "response: #{response.inspect}"
48-
unless response&.headers&.dig("warning")
32+
if response&.headers&.dig("warning")
4933
logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]})
5034
end
5135
if response['values'] && response['columns']
5236
process_response(response['values'], response['columns'], output_queue)
5337
end
5438
end
5539

40+
# Execute a retryable operation with proper error handling
41+
# @param job_name [String] Name of the job for logging purposes
42+
# @yield The block to execute
43+
# @return [Boolean] true if successful, false otherwise
44+
def retryable(job_name, &block)
45+
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
46+
stud_try.try((@retries + 1).times) { yield }
47+
rescue => e
48+
error_details = {:message => e.message, :cause => e.cause}
49+
error_details[:backtrace] = e.backtrace if logger.debug?
50+
logger.error("#{job_name} failed with ", error_details)
51+
false
52+
end
53+
5654
private
5755

5856
# Process the ESQL response and push events to the output queue

Diff for: spec/inputs/elasticsearch_esql_spec.rb

+63-82
Original file line numberDiff line numberDiff line change
@@ -2,114 +2,95 @@
22
require "logstash/devutils/rspec/spec_helper"
33
require "logstash/inputs/elasticsearch"
44
require "elasticsearch"
5-
require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'
65

7-
describe LogStash::Inputs::Elasticsearch, :ecs_compatibility_support do
8-
let(:plugin) { described_class.new(config) }
6+
describe LogStash::Inputs::Elasticsearch::Esql do
97
let(:client) { instance_double(Elasticsearch::Client) }
10-
let(:queue) { Queue.new }
11-
let(:cluster_info) { { "version" => { "number" => "8.11.0", "build_flavor" => "default" } } }
12-
13-
let(:config) do
8+
let(:esql_client) { double("esql-client") }
9+
let(:plugin) { instance_double(LogStash::Inputs::Elasticsearch, params: plugin_config) }
10+
let(:plugin_config) do
1411
{
1512
"query" => "FROM test-index | STATS count() BY field",
16-
"response_type" => "esql",
1713
"retries" => 3
1814
}
1915
end
16+
let(:esql_executor) { described_class.new(client, plugin) }
2017

21-
describe "#initialize" do
18+
describe "when initializes" do
2219
it "sets up the ESQL client with correct parameters" do
23-
expect(plugin.instance_variable_get(:@query)).to eq(config["query"])
24-
expect(plugin.instance_variable_get(:@response_type)).to eq(config["response_type"])
25-
expect(plugin.instance_variable_get(:@retries)).to eq(config["retries"])
20+
expect(esql_executor.instance_variable_get(:@query)).to eq(plugin_config["query"])
21+
expect(esql_executor.instance_variable_get(:@retries)).to eq(plugin_config["retries"])
2622
end
2723
end
2824

29-
describe "#register" do
30-
before(:each) do
31-
Elasticsearch::Client.send(:define_method, :ping) { }
32-
allow_any_instance_of(Elasticsearch::Client).to receive(:info).and_return(cluster_info)
25+
describe "when faces error while retrying" do
26+
it "retries the given block the specified number of times" do
27+
attempts = 0
28+
result = esql_executor.retryable("Test Job") do
29+
attempts += 1
30+
raise StandardError if attempts < 3
31+
"success"
32+
end
33+
expect(attempts).to eq(3)
34+
expect(result).to eq("success")
3335
end
34-
it "creates ES|QL executor" do
35-
plugin.register
36-
expect(plugin.instance_variable_get(:@query_executor)).to be_an_instance_of(LogStash::Inputs::Elasticsearch::Esql)
36+
37+
it "returns false if the block fails all attempts" do
38+
result = esql_executor.retryable("Test Job") do
39+
raise StandardError
40+
end
41+
expect(result).to eq(false)
3742
end
3843
end
3944

40-
describe "#validation" do
45+
describe "when executing chain of processes" do
46+
let(:output_queue) { Queue.new }
47+
let(:response) { { 'values' => [%w[foo bar]], 'columns' => [{ 'name' => 'id'}, { 'name' => 'val'}] } }
4148

42-
describe "LS version" do
43-
context "when compatible" do
44-
before(:each) do
45-
stub_const("LogStash::VERSION", "8.11.0")
46-
end
47-
48-
it "does not raise an error" do
49-
expect { plugin.send(:validate_ls_version_for_esql_support!) }.not_to raise_error
50-
end
51-
end
52-
53-
context "when incompatible" do
54-
before(:each) do
55-
stub_const("LOGSTASH_VERSION", "8.10.0")
56-
end
49+
before do
50+
allow(esql_executor).to receive(:retryable).and_yield
51+
allow(client).to receive_message_chain(:esql, :query).and_return(response)
52+
allow(plugin).to receive(:decorate_and_push_to_queue)
53+
end
5754

58-
it "raises a runtime error" do
59-
expect { plugin.send(:validate_ls_version_for_esql_support!) }
60-
.to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4/)
61-
end
62-
end
55+
it "executes the ESQL query and processes the results" do
56+
allow(response).to receive(:headers).and_return({})
57+
esql_executor.do_run(output_queue)
58+
expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'})
6359
end
6460

65-
describe "ES version" do
66-
before(:each) do
67-
allow(plugin).to receive(:es_version).and_return("8.10.5")
68-
end
61+
it "logs a warning if the response contains a warning header" do
62+
allow(response).to receive(:headers).and_return({"warning" => "some warning"})
63+
expect(esql_executor.logger).to receive(:warn).with("ES|QL executor received warning", {:message => "some warning"})
64+
esql_executor.do_run(output_queue)
65+
end
6966

70-
context "when incompatible" do
71-
it "raises a runtime error" do
72-
expect { plugin.send(:validate_es_for_esql_support!) }
73-
.to raise_error(RuntimeError, /Connected Elasticsearch 8.10.5 version does not supports ES|QL. Please upgrade it./)
74-
end
75-
end
67+
it "does not log a warning if the response does not contain a warning header" do
68+
allow(response).to receive(:headers).and_return({})
69+
expect(esql_executor.logger).not_to receive(:warn)
70+
esql_executor.do_run(output_queue)
7671
end
72+
end
7773

78-
describe "ES|QL query" do
79-
context "when query is valid" do
80-
it "does not raise an error" do
81-
expect { plugin.send(:validate_esql_query!) }.not_to raise_error
82-
end
83-
end
8474

85-
context "when query is empty" do
86-
let(:config) do
87-
{
88-
"query" => " "
89-
}
90-
end
75+
describe "when starts processing the response" do
76+
let(:output_queue) { Queue.new }
77+
let(:values) { [%w[foo bar]] }
78+
let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] }
9179

92-
# TODO: make shared spec
93-
it "raises a configuration error" do
94-
expect { plugin.send(:validate_esql_query!) }
95-
.to raise_error(LogStash::ConfigurationError, /`query` cannot be empty/)
96-
end
97-
end
80+
it "processes the ESQL response and pushes events to the output queue" do
81+
allow(plugin).to receive(:decorate_and_push_to_queue)
82+
esql_executor.send(:process_response, values, columns, output_queue)
83+
expect(plugin).to have_received(:decorate_and_push_to_queue).with(output_queue, {'id' => 'foo', 'val' => 'bar'})
84+
end
85+
end
9886

99-
context "when query doesn't align with ES syntax" do
100-
let(:config) do
101-
{
102-
"query" => "RANDOM query"
103-
}
104-
end
87+
describe "when maps column and values" do
88+
let(:columns) { [{'name' => 'id'}, {'name' => 'val'}] }
89+
let(:values) { %w[foo bar] }
10590

106-
it "raises a configuration error" do
107-
source_commands = %w[FROM ROW SHOW]
108-
expect { plugin.send(:validate_esql_query!) }
109-
.to raise_error(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}")
110-
end
111-
end
91+
it "maps column names to their corresponding values" do
92+
result = esql_executor.send(:map_column_and_values, columns, values)
93+
expect(result).to eq({'id' => 'foo', 'val' => 'bar'})
11294
end
11395
end
114-
115-
end
96+
end

Diff for: spec/inputs/elasticsearch_spec.rb

+106
Original file line numberDiff line numberDiff line change
@@ -1370,4 +1370,110 @@ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Clie
13701370
client.transport.respond_to?(:transport) ? client.transport.transport : client.transport
13711371
end
13721372

1373+
context "#ESQL" do
1374+
let(:config) do
1375+
{
1376+
"query" => "FROM test-index | STATS count() BY field",
1377+
"response_type" => "esql",
1378+
"retries" => 3
1379+
}
1380+
end
1381+
let(:es_version) { "8.11.0" }
1382+
1383+
before(:each) do
1384+
# ES|QL supported |elasticsearch-ruby v8 client is available from 8.17.4
1385+
# this is a safeguard to let tests succeed in <8.17.4 versions, see validation test cases for unsupported behavior
1386+
stub_const("LOGSTASH_VERSION", "8.17.4")
1387+
end
1388+
1389+
describe "#initialize" do
1390+
it "sets up the ESQL client with correct parameters" do
1391+
expect(plugin.instance_variable_get(:@query)).to eq(config["query"])
1392+
expect(plugin.instance_variable_get(:@response_type)).to eq(config["response_type"])
1393+
expect(plugin.instance_variable_get(:@retries)).to eq(config["retries"])
1394+
end
1395+
end
1396+
1397+
describe "#register" do
1398+
before(:each) do
1399+
Elasticsearch::Client.send(:define_method, :ping) { }
1400+
allow_any_instance_of(Elasticsearch::Client).to receive(:info).and_return(cluster_info)
1401+
end
1402+
it "creates ES|QL executor" do
1403+
plugin.register
1404+
expect(plugin.instance_variable_get(:@query_executor)).to be_an_instance_of(LogStash::Inputs::Elasticsearch::Esql)
1405+
end
1406+
end
1407+
1408+
describe "#validation" do
1409+
1410+
describe "LS version" do
1411+
context "when compatible" do
1412+
1413+
it "does not raise an error" do
1414+
expect { plugin.send(:validate_ls_version_for_esql_support!) }.not_to raise_error
1415+
end
1416+
end
1417+
1418+
context "when incompatible" do
1419+
before(:each) do
1420+
stub_const("LOGSTASH_VERSION", "8.10.0")
1421+
end
1422+
1423+
it "raises a runtime error" do
1424+
expect { plugin.send(:validate_ls_version_for_esql_support!) }
1425+
.to raise_error(RuntimeError, /Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least 8.17.4/)
1426+
end
1427+
end
1428+
end
1429+
1430+
describe "ES version" do
1431+
before(:each) do
1432+
allow(plugin).to receive(:es_version).and_return("8.10.5")
1433+
end
1434+
1435+
context "when incompatible" do
1436+
it "raises a runtime error" do
1437+
expect { plugin.send(:validate_es_for_esql_support!) }
1438+
.to raise_error(RuntimeError, /Connected Elasticsearch 8.10.5 version does not supports ES|QL. Please upgrade it./)
1439+
end
1440+
end
1441+
end
1442+
1443+
describe "ES|QL query" do
1444+
context "when query is valid" do
1445+
it "does not raise an error" do
1446+
expect { plugin.send(:validate_esql_query!) }.not_to raise_error
1447+
end
1448+
end
1449+
1450+
context "when query is empty" do
1451+
let(:config) do
1452+
{
1453+
"query" => " "
1454+
}
1455+
end
1456+
1457+
it "raises a configuration error" do
1458+
expect { plugin.send(:validate_esql_query!) }
1459+
.to raise_error(LogStash::ConfigurationError, /`query` cannot be empty/)
1460+
end
1461+
end
1462+
1463+
context "when query doesn't align with ES syntax" do
1464+
let(:config) do
1465+
{
1466+
"query" => "RANDOM query"
1467+
}
1468+
end
1469+
1470+
it "raises a configuration error" do
1471+
source_commands = %w[FROM ROW SHOW]
1472+
expect { plugin.send(:validate_esql_query!) }
1473+
.to raise_error(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}")
1474+
end
1475+
end
1476+
end
1477+
end
1478+
end
13731479
end

0 commit comments

Comments
 (0)