Skip to content

Commit 5abd4ca

Browse files
committed
ES|QL PoC for es-input.
1 parent d9bf375 commit 5abd4ca

File tree

2 files changed

+70
-61
lines changed

2 files changed

+70
-61
lines changed

lib/logstash/inputs/elasticsearch.rb

+3-61
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
7474
require 'logstash/inputs/elasticsearch/paginated_search'
7575
require 'logstash/inputs/elasticsearch/aggregation'
7676
require 'logstash/inputs/elasticsearch/cursor_tracker'
77+
require 'logstash/inputs/elasticsearch/esql'
7778

7879
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
7980
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
@@ -125,20 +126,6 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
125126
# by this pipeline input.
126127
config :slices, :validate => :number
127128

128-
# Enable tracking the value of a given field to be used as a cursor
129-
# Main concerns:
130-
# * using anything other than _event.timestamp easily leads to data loss
131-
# * the first "synchronization run can take a long time"
132-
config :tracking_field, :validate => :string
133-
134-
# Define the initial seed value of the tracking_field
135-
config :tracking_field_seed, :validate => :string, :default => "1970-01-01T00:00:00.000000000Z"
136-
137-
# The location of where the tracking field value will be stored
138-
# The value is persisted after each scheduled run (and not per result)
139-
# If it's not set it defaults to '${path.data}/plugins/inputs/elasticsearch/<pipeline_id>/last_run_value'
140-
config :last_run_metadata_path, :validate => :string
141-
142129
# If set, include Elasticsearch document information such as index, type, and
143130
# the id in the event.
144131
#
@@ -265,10 +252,6 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
265252
# exactly once.
266253
config :schedule, :validate => :string
267254

268-
# Allow scheduled runs to overlap (enabled by default). Setting to false will
269-
# only start a new scheduled run after the previous one completes.
270-
config :schedule_overlap, :validate => :boolean
271-
272255
# If set, the _source of each hit will be added nested under the target instead of at the top-level
273256
config :target, :validate => :field_reference
274257

@@ -347,30 +330,16 @@ def register
347330

348331
setup_query_executor
349332

350-
setup_cursor_tracker
351-
352333
@client
353334
end
354335

355336
def run(output_queue)
356337
if @schedule
357-
scheduler.cron(@schedule, :overlap => @schedule_overlap) do
358-
@query_executor.do_run(output_queue, get_query_object())
359-
end
338+
scheduler.cron(@schedule) { @query_executor.do_run(output_queue) }
360339
scheduler.join
361340
else
362-
@query_executor.do_run(output_queue, get_query_object())
363-
end
364-
end
365-
366-
def get_query_object
367-
if @cursor_tracker
368-
query = @cursor_tracker.inject_cursor(@query)
369-
@logger.debug("new query is #{query}")
370-
else
371-
query = @query
341+
@query_executor.do_run(output_queue)
372342
end
373-
LogStash::Json.load(query)
374343
end
375344

376345
##
@@ -380,11 +349,6 @@ def push_hit(hit, output_queue, root_field = '_source')
380349
event = event_from_hit(hit, root_field)
381350
decorate(event)
382351
output_queue << event
383-
record_last_value(event)
384-
end
385-
386-
def record_last_value(event)
387-
@cursor_tracker.record_last_value(event) if @tracking_field
388352
end
389353

390354
def event_from_hit(hit, root_field)
@@ -678,28 +642,6 @@ def setup_query_executor
678642
end
679643
end
680644

681-
def setup_cursor_tracker
682-
return unless @tracking_field
683-
return unless @query_executor.is_a?(LogStash::Inputs::Elasticsearch::SearchAfter)
684-
685-
if @resolved_search_api != "search_after" || @response_type != "hits"
686-
raise ConfigurationError.new("The `tracking_field` feature can only be used with `search_after` non-aggregation queries")
687-
end
688-
689-
@cursor_tracker = CursorTracker.new(last_run_metadata_path: last_run_metadata_path,
690-
tracking_field: @tracking_field,
691-
tracking_field_seed: @tracking_field_seed)
692-
@query_executor.cursor_tracker = @cursor_tracker
693-
end
694-
695-
def last_run_metadata_path
696-
return @last_run_metadata_path if @last_run_metadata_path
697-
698-
last_run_metadata_path = ::File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "elasticsearch", pipeline_id, "last_run_value")
699-
FileUtils.mkdir_p ::File.dirname(last_run_metadata_path)
700-
last_run_metadata_path
701-
end
702-
703645
def get_transport_client_class
704646
# LS-core includes `elasticsearch` gem. The gem is composed of two separate gems: `elasticsearch-api` and `elasticsearch-transport`
705647
# And now `elasticsearch-transport` is old, instead we have `elastic-transport`.
+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
require 'logstash/helpers/loggable_try'
2+
3+
module LogStash
4+
module Inputs
5+
class Elasticsearch
6+
class Esql
7+
include LogStash::Util::Loggable
8+
9+
ESQL_JOB = "ES|QL job"
10+
11+
def initialize(client, plugin)
12+
@client = client
13+
@plugin_params = plugin.params
14+
@plugin = plugin
15+
@retries = @plugin_params["retries"]
16+
17+
@query = @plugin_params["query"]
18+
esql_options = @plugin_params["esql"] ? @plugin_params["esql"]: {}
19+
@esql_params = esql_options["params"] ? esql_options["params"] : {}
20+
# TODO: add filter as well
21+
# @esql_params = esql_options["filter"] | []
22+
end
23+
24+
def retryable(job_name, &block)
25+
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
26+
stud_try.try((@retries + 1).times) { yield }
27+
rescue => e
28+
error_details = {:message => e.message, :cause => e.cause}
29+
error_details[:backtrace] = e.backtrace if logger.debug?
30+
logger.error("#{job_name} failed with ", error_details)
31+
false
32+
end
33+
34+
def do_run(output_queue)
35+
logger.info("ES|QL executor starting")
36+
response = retryable(ESQL_JOB) do
37+
@client.esql.query({ body: { query: @query }, format: 'json' })
38+
# TODO: make sure to add params, filters, etc...
39+
# @client.esql.query({ body: { query: @query }, format: 'json' }.merge(@esql_params))
40+
41+
end
42+
puts "Response: #{response.inspect}"
43+
if response && response['values']
44+
response['values'].each do |value|
45+
mapped_data = map_column_and_values(response['columns'], value)
46+
puts "Mapped Data: #{mapped_data}"
47+
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
48+
end
49+
end
50+
end
51+
52+
def map_column_and_values(columns, values)
53+
puts "columns class: #{columns.class}"
54+
puts "values class: #{values.class}"
55+
puts "columns: #{columns.inspect}"
56+
puts "values: #{values.inspect}"
57+
mapped_data = {}
58+
columns.each_with_index do |column, index|
59+
mapped_data[column["name"]] = values[index]
60+
end
61+
puts "values: #{mapped_data.inspect}"
62+
mapped_data
63+
end
64+
end
65+
end
66+
end
67+
end

0 commit comments

Comments
 (0)