Skip to content

Esql support #233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a307db2
ES|QL support: ESQL executor implementation, response type to accept …
mashhurs Apr 8, 2025
9c35f22
Merge with upstream, warn if query doesn't include METADATA which DSL…
mashhurs Apr 9, 2025
6f99055
Run unit tests with the LS version which actually supports the ES|QL.
mashhurs Apr 10, 2025
086a592
Add query type to the agent. DRY of supported ES/LS versions.
mashhurs Apr 10, 2025
e30e0f9
Remove query type from user-agent since it is useless, put back accid…
mashhurs Apr 10, 2025
7746c14
Initial docs added for ES|QL.
mashhurs Apr 10, 2025
76303d8
Update query to include condition with string.
mashhurs Apr 11, 2025
1fb29f7
Tested escaped chars cases, uses orignal query.
mashhurs Apr 12, 2025
5d47f2f
Integration tests added.
mashhurs Apr 14, 2025
c291e24
Skip the ESQL test if LS with the ES client which doesn't support ESQ…
mashhurs Apr 14, 2025
22e72e9
Add comments on response type and query params about ES|QL acceptance…
mashhurs Apr 14, 2025
af6e24a
Update spec/inputs/integration/elasticsearch_esql_spec.rb
mashhurs Apr 21, 2025
4ce6fa4
Integration test skip condition correction.
mashhurs Apr 21, 2025
4ed69ff
Introduce query_params option to accept drop_null_columns, set defaul…
mashhurs Apr 24, 2025
0725f98
Fix the failed integration test.
mashhurs Apr 25, 2025
cfb36f3
Request dropping null columns and filter out null values. Consider se…
mashhurs May 1, 2025
a92a71e
Apply suggestions from code review
mashhurs May 7, 2025
d4f559d
Apply code review suggestions: to use decorator as a proc call, doc s…
mashhurs May 7, 2025
65eb675
Rename warning msg field name to avoid conflicts. Generate a target a…
mashhurs May 8, 2025
789f467
Ignore sub-fields with warninigs and keep only parent.
mashhurs May 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.2.0
- ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233)

## 5.1.0
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)

Expand Down
74 changes: 72 additions & 2 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,43 @@ The next scheduled run:
* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and
* updates the value of the field at the end of the pagination.

[id="plugins-{type}s-{plugin}-esql"]
==== ES|QL support
{es} Query Language (ES|QL) provides a SQL-like interface for querying your {es} data.

To utilize the ES|QL feature with this plugin, the following version requirements must be met:
[cols="1,2",options="header"]
|===
|Component |Minimum version
|{es} |8.11.0 or newer
|{ls} |8.17.4 or newer
|This plugin |4.23.0+ (4.x series) or 5.2.0+ (5.x series)
|===

To configure ES|QL query in the plugin, set the `response_type` to `esql` and provide your ES|QL query in the `query` parameter.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels cumbersome to me.

Could we align with the proposal in the filter PR to provide an ESQL query with esql_query instead of requring the configuration of multiple separate parameters?

In this case, since the input plugin does require a JSON-encoded object for its query parameter when using the Query DSL, we could auto-detect that a given query parameter is ESQL (unlike the ES filter, which uses a QueryString query as its query parameter)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we had a discussion with @jsvd about this, we had a similar idea to deprecate this response_type and replace with query_type in the future. And through the experience as I do see, introducing new param is not a difficult, deprecation -> obseletion -> removal is a long headache process.
From this point of view, I would support adding minimal change but I am open to apply changes if anyone has strong opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a separate note on how to do it.

I don't personally care much about removing the response_type right away, but if a user starts using ESQL I'd like them to not start new usages of a config that we'd like to deprecate.

Since this is effectively a rename, we can easily use the with_deprecated_alias helper from NormalizeConfigSupport.


IMPORTANT: We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments.

The following is a basic scheduled ES|QL query that runs hourly:
[source, ruby]
input {
elasticsearch {
id => hourly_cron_job
hosts => [ 'https://..']
api_key => '....'
response_type => 'esql'
query => '
FROM food-index
| WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour
| LIMIT 500'
schedule => '0 * * * *' # every hour at min 0
}
}

NOTE: With ES|QL query, {ls} doesn't generate `event.original`

For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation].

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Input configuration options

Expand Down Expand Up @@ -257,7 +294,7 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details.
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations","esql"]`|No
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-schedule_overlap>> |<<boolean,boolean>>|No
Expand Down Expand Up @@ -507,17 +544,50 @@ the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the qu
[id="plugins-{type}s-{plugin}-response_type"]
===== `response_type`

* Value can be any of: `hits`, `aggregations`
* Value can be any of: `hits`, `aggregations`, `esql`
* Default value is `hits`

Which part of the result to transform into Logstash events when processing the
response from the query.

The default `hits` will generate one event per returned document (i.e. "hit").

When set to `aggregations`, a single Logstash event will be generated with the
contents of the `aggregations` object of the query's response. In this case the
`hits` object will be ignored. The parameter `size` will be always be set to
0 regardless of the default or user-defined value set in this plugin.

When using the `esql` setting, the query parameter must be a valid plaintext ES|QL string.
When this setting is active, `target`, `size`, `slices` and `search_api` parameters are ignored.
ES|QL returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries).
The plugin maps each value entry to an event, populating corresponding fields.
For example, a query might produce a table like:

[cols="2,1,1,2",options="header"]
|===
|`timestamp` |`user_id` | `action` | `status_code`

|2025-04-10T12:00:00 |123 |login |200
|2025-04-10T12:05:00 |456 |purchase |403
|===

For this case, the plugin emits two events look like
[source, json]
[
{
"timestamp": "2025-04-10T12:00:00",
"user_id": 123,
"action": "login",
"status_code": 200
},
{
"timestamp": "2025-04-10T12:05:00",
"user_id": 456,
"action": "purchase",
"status_code": 403
}
]

[id="plugins-{type}s-{plugin}-request_timeout_seconds"]
===== `request_timeout_seconds`

Expand Down
56 changes: 51 additions & 5 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
require 'logstash/inputs/elasticsearch/paginated_search'
require 'logstash/inputs/elasticsearch/aggregation'
require 'logstash/inputs/elasticsearch/cursor_tracker'
require 'logstash/inputs/elasticsearch/esql'

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
Expand Down Expand Up @@ -104,7 +105,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# This allows you to speccify the response type: either hits or aggregations
# where hits: normal search request
# aggregations: aggregation request
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrating to query_type with auto-detection of ESQL queries would be pretty straight-forward with the NormalizeConfigSupport mixin:

Suggested change
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits'
config :response_type, :validate => %w[hits aggregations], :deprecated => "use `query_type`"
config :query_type, :validate => %w[hits aggregations esql] # default depends on query shape
   def register
+    @query_type = normalize_config("query_type") do |normalizer|
+      normalizer.with_deprecated_alias("response_type")
+    end || (@query.start_with?('{') ? 'hits' : 'esql')

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to add the deprecation right after this ES|QL change.
One agreement we need to decide is naming. I personally do not like hits, aggregations along with esql. They indicate different contexts. I had options dsl_search, dsl_aggregation and esql.
Let me please know your opinion: I can either apply with change if we quickly come with agreement or create an issue follow up right after this PR.


# This allows you to set the maximum number of hits returned per scroll.
config :size, :validate => :number, :default => 1000
Expand Down Expand Up @@ -286,6 +287,9 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
INTERNAL_ORIGIN_HEADER = { 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze

LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8
ES_ESQL_SUPPORT_VERSION = "8.11.0"

def initialize(params={})
super(params)

Expand All @@ -302,10 +306,16 @@ def register
fill_hosts_from_cloud_id
setup_ssl_params!

@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
if @response_type == 'esql'
validate_ls_version_for_esql_support!
validate_esql_query!
inform_ineffective_esql_params
else
@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
end
end

@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
Expand Down Expand Up @@ -341,6 +351,8 @@ def register

test_connection!

validate_es_for_esql_support!

setup_serverless

setup_search_api
Expand All @@ -364,6 +376,7 @@ def run(output_queue)
end

def get_query_object
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: moved to private area

return @query if @response_type == 'esql'
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
Expand Down Expand Up @@ -398,6 +411,12 @@ def event_from_hit(hit, root_field)
return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
end

def decorate_and_push_to_queue(output_queue, mapped_entry)
event = targeted_event_factory.new_event mapped_entry
decorate(event)
output_queue << event
end

def set_docinfo_fields(hit, event)
# do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
docinfo_target = event.get(@docinfo_target) || {}
Expand Down Expand Up @@ -675,6 +694,8 @@ def setup_query_executor
end
when 'aggregations'
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
when 'esql'
LogStash::Inputs::Elasticsearch::Esql.new(@client, self)
end
end

Expand Down Expand Up @@ -714,6 +735,31 @@ def get_transport_client_class
::Elastic::Transport::Transport::HTTP::Manticore
end

def validate_ls_version_for_esql_support!
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION)
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}")
end
end

def validate_esql_query!
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
source_commands = %w[FROM ROW SHOW]
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
end

def inform_ineffective_esql_params
ineffective_options = original_params.keys & %w(target size slices search_api)
@logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1
end

def validate_es_for_esql_support!
return unless @response_type == 'esql'
# make sure connected ES supports ES|QL (8.11+)
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION)
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql
end

module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator
Expand Down
83 changes: 83 additions & 0 deletions lib/logstash/inputs/elasticsearch/esql.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
require 'logstash/helpers/loggable_try'

module LogStash
module Inputs
class Elasticsearch
class Esql
include LogStash::Util::Loggable

ESQL_JOB = "ES|QL job"

# Initialize the ESQL query executor
# @param client [Elasticsearch::Client] The Elasticsearch client instance
# @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance
def initialize(client, plugin)
@client = client
@plugin_params = plugin.params
@plugin = plugin
@retries = @plugin_params["retries"]
@query = @plugin_params["query"]
unless @query.include?('METADATA')
logger.warn("The query doesn't have METADATA keyword. Including it makes _id and _version available in the documents", {:query => @query})
end
end

# Execute the ESQL query and process results
# @param output_queue [Queue] The queue to push processed events to
# @param query A query to be executed
def do_run(output_queue, query)
logger.info("ES|QL executor starting")
response = retryable(ESQL_JOB) do
@client.esql.query({ body: { query: query }, format: 'json' })
end
# retriable already printed error details
return if response == false

if response&.headers&.dig("warning")
logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]})
end
if response['values'] && response['columns']
process_response(response['values'], response['columns'], output_queue)
end
end

# Execute a retryable operation with proper error handling
# @param job_name [String] Name of the job for logging purposes
# @yield The block to execute
# @return [Boolean] true if successful, false otherwise
def retryable(job_name, &block)
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
stud_try.try((@retries + 1).times) { yield }
rescue => e
error_details = {:message => e.message, :cause => e.cause}
error_details[:backtrace] = e.backtrace if logger.debug?
logger.error("#{job_name} failed with ", error_details)
false
end

private

# Process the ESQL response and push events to the output queue
# @param values [Array[Array]] The ESQL query response hits
# @param columns [Array[Hash]] The ESQL query response columns
# @param output_queue [Queue] The queue to push processed events to
def process_response(values, columns, output_queue)
values.each do |value|
mapped_data = map_column_and_values(columns, value)
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
end
end

# Map column names to their corresponding values
# @param columns [Array] Array of column definitions
# @param values [Array] Array of values for the current row
# @return [Hash] Mapped data with column names as keys
def map_column_and_values(columns, values)
columns.each_with_index.with_object({}) do |(column, index), mapped_data|
mapped_data[column["name"]] = values[index]
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion logstash-input-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-elasticsearch'
s.version = '5.1.0'
s.version = '5.2.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads query results from an Elasticsearch cluster"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
Loading