Skip to content

Esql support #233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.2.0
- ES|QL support [#233](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/233)

## 5.1.0
- Add "cursor"-like index tracking [#205](https://github.com/logstash-plugins/logstash-input-elasticsearch/pull/205)

Expand Down
77 changes: 75 additions & 2 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,46 @@ The next scheduled run:
* uses {ref}/point-in-time-api.html#point-in-time-api[Point in time (PIT)] + {ref}/paginate-search-results.html#search-after[Search after] to paginate through all the data, and
* updates the value of the field at the end of the pagination.

[id="plugins-{type}s-{plugin}-esql"]
==== ES|QL support
{es} Query Language (ES|QL) provides a SQL-like interface for querying your {es} data.

To utilize the ES|QL feature with this plugin, the following version requirements must be met:
[cols="1,2",options="header"]
|===
|Component |Minimum version
|{es} |8.11.0 or newer
|{ls} |8.17.4 or newer
|This plugin |4.23.0+ (4.x series) or 5.2.0+ (5.x series)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: before releasing this change, I will backport and release 4.23.0 first

|===

To configure ES|QL query in the plugin, set the `response_type` to `esql` and provide your ES|QL query in the `query` parameter.

IMPORTANT: We recommend understanding https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-limitations.html[ES|QL current limitations] before using it in production environments.

The following is a basic scheduled ES|QL query that runs hourly:
[source, ruby]
input {
elasticsearch {
id => hourly_cron_job
hosts => [ 'https://..']
api_key => '....'
response_type => 'esql'
query => '
FROM food-index
| WHERE spicy_level = "hot" AND @timestamp > NOW() - 1 hour
| LIMIT 500
'
schedule => '0 * * * *' # every hour at min 0
}
}

Set `config.support_escapes: true` in `logstash.yml` if you need to escape special chars in the query.

NOTE: With ES|QL query, {ls} doesn't generate `event.original`

For comprehensive ES|QL syntax reference and best practices, see the https://www.elastic.co/guide/en/elasticsearch/reference/current/esql-syntax.html[official {es} documentation].

[id="plugins-{type}s-{plugin}-options"]
==== Elasticsearch Input configuration options

Expand Down Expand Up @@ -257,7 +297,7 @@ Please check out <<plugins-{type}s-{plugin}-obsolete-options>> for details.
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-proxy>> |<<uri,uri>>|No
| <<plugins-{type}s-{plugin}-query>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations"]`|No
| <<plugins-{type}s-{plugin}-response_type>> |<<string,string>>, one of `["hits","aggregations","esql"]`|No
| <<plugins-{type}s-{plugin}-request_timeout_seconds>> | <<number,number>>|No
| <<plugins-{type}s-{plugin}-schedule>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-schedule_overlap>> |<<boolean,boolean>>|No
Expand Down Expand Up @@ -507,17 +547,50 @@ the default sort `'{ "sort": { "_shard_doc": "asc" } }'` will be added to the qu
[id="plugins-{type}s-{plugin}-response_type"]
===== `response_type`

* Value can be any of: `hits`, `aggregations`
* Value can be any of: `hits`, `aggregations`, `esql`
* Default value is `hits`

Which part of the result to transform into Logstash events when processing the
response from the query.

The default `hits` will generate one event per returned document (i.e. "hit").

When set to `aggregations`, a single Logstash event will be generated with the
contents of the `aggregations` object of the query's response. In this case the
`hits` object will be ignored. The parameter `size` will be always be set to
0 regardless of the default or user-defined value set in this plugin.

When using the `esql` setting, the query parameter must be a valid plaintext ES|QL string.
When this setting is active, `target`, `size`, `slices` and `search_api` parameters are ignored.
ES|QL returns query results in a structured tabular format, where data is organized into _columns_ (fields) and _values_ (entries).
The plugin maps each value entry to an event, populating corresponding fields.
For example, a query might produce a table like:

[cols="2,1,1,2",options="header"]
|===
|`timestamp` |`user_id` | `action` | `status_code`

|2025-04-10T12:00:00 |123 |login |200
|2025-04-10T12:05:00 |456 |purchase |403
|===

For this case, the plugin emits two events look like
[source, json]
[
{
"timestamp": "2025-04-10T12:00:00",
"user_id": 123,
"action": "login",
"status_code": 200
},
{
"timestamp": "2025-04-10T12:05:00",
"user_id": 456,
"action": "purchase",
"status_code": 403
}
]

[id="plugins-{type}s-{plugin}-request_timeout_seconds"]
===== `request_timeout_seconds`

Expand Down
71 changes: 60 additions & 11 deletions lib/logstash/inputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
require 'logstash/inputs/elasticsearch/paginated_search'
require 'logstash/inputs/elasticsearch/aggregation'
require 'logstash/inputs/elasticsearch/cursor_tracker'
require 'logstash/inputs/elasticsearch/esql'

include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
include LogStash::PluginMixins::ECSCompatibilitySupport::TargetCheck
Expand All @@ -96,15 +97,18 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
# The index or alias to search.
config :index, :validate => :string, :default => "logstash-*"

# The query to be executed. Read the Elasticsearch query DSL documentation
# for more info
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
# The query to be executed. DSL or ES|QL (when `response_type => 'esql'`) query type is accepted.
# Read the following documentations for more info
# Query DSL: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html
# ES|QL: https://www.elastic.co/guide/en/elasticsearch/reference/current/esql.html
config :query, :validate => :string, :default => '{ "sort": [ "_doc" ] }'

# This allows you to speccify the response type: either hits or aggregations
# where hits: normal search request
# aggregations: aggregation request
config :response_type, :validate => ['hits', 'aggregations'], :default => 'hits'
# This allows you to speccify the response type: one of [hits, aggregations, esql]
# where
# hits: normal search request
# aggregations: aggregation request
# esql: ES|QL request
config :response_type, :validate => %w[hits aggregations esql], :default => 'hits'

# This allows you to set the maximum number of hits returned per scroll.
config :size, :validate => :number, :default => 1000
Expand Down Expand Up @@ -286,6 +290,9 @@ class LogStash::Inputs::Elasticsearch < LogStash::Inputs::Base
DEFAULT_EAV_HEADER = { "Elastic-Api-Version" => "2023-10-31" }.freeze
INTERNAL_ORIGIN_HEADER = { 'x-elastic-product-origin' => 'logstash-input-elasticsearch'}.freeze

LS_ESQL_SUPPORT_VERSION = "8.17.4" # the version started using elasticsearch-ruby v8
ES_ESQL_SUPPORT_VERSION = "8.11.0"

def initialize(params={})
super(params)

Expand All @@ -302,10 +309,16 @@ def register
fill_hosts_from_cloud_id
setup_ssl_params!

@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
if @response_type == 'esql'
validate_ls_version_for_esql_support!
validate_esql_query!
inform_ineffective_esql_params
else
@base_query = LogStash::Json.load(@query)
if @slices
@base_query.include?('slice') && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `query` option cannot specify specific `slice` when configured to manage parallel slices with `slices` option")
@slices < 1 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `slices` option must be greater than zero, got `#{@slices}`")
end
end

@retries < 0 && fail(LogStash::ConfigurationError, "Elasticsearch Input Plugin's `retries` option must be equal or greater than zero, got `#{@retries}`")
Expand Down Expand Up @@ -341,6 +354,8 @@ def register

test_connection!

validate_es_for_esql_support!

setup_serverless

setup_search_api
Expand All @@ -364,6 +379,7 @@ def run(output_queue)
end

def get_query_object
return @query if @response_type == 'esql'
if @cursor_tracker
query = @cursor_tracker.inject_cursor(@query)
@logger.debug("new query is #{query}")
Expand Down Expand Up @@ -398,6 +414,12 @@ def event_from_hit(hit, root_field)
return event_factory.new_event('event' => { 'original' => serialized_hit }, 'tags' => ['_elasticsearch_input_failure'])
end

def decorate_and_push_to_queue(output_queue, mapped_entry)
event = targeted_event_factory.new_event mapped_entry
decorate(event)
output_queue << event
end

def set_docinfo_fields(hit, event)
# do not assume event[@docinfo_target] to be in-place updatable. first get it, update it, then at the end set it in the event.
docinfo_target = event.get(@docinfo_target) || {}
Expand Down Expand Up @@ -675,6 +697,8 @@ def setup_query_executor
end
when 'aggregations'
LogStash::Inputs::Elasticsearch::Aggregation.new(@client, self)
when 'esql'
LogStash::Inputs::Elasticsearch::Esql.new(@client, self)
end
end

Expand Down Expand Up @@ -714,6 +738,31 @@ def get_transport_client_class
::Elastic::Transport::Transport::HTTP::Manticore
end

def validate_ls_version_for_esql_support!
if Gem::Version.create(LOGSTASH_VERSION) < Gem::Version.create(LS_ESQL_SUPPORT_VERSION)
fail("Current version of Logstash does not include Elasticsearch client which supports ES|QL. Please upgrade Logstash to at least #{LS_ESQL_SUPPORT_VERSION}")
end
end

def validate_esql_query!
fail(LogStash::ConfigurationError, "`query` cannot be empty") if @query.strip.empty?
source_commands = %w[FROM ROW SHOW]
contains_source_command = source_commands.any? { |source_command| @query.strip.start_with?(source_command) }
fail(LogStash::ConfigurationError, "`query` needs to start with any of #{source_commands}") unless contains_source_command
end

def inform_ineffective_esql_params
ineffective_options = original_params.keys & %w(index target size slices search_api, docinfo, docinfo_target, docinfo_fields)
@logger.info("Configured #{ineffective_options} params are ineffective in ES|QL mode") if ineffective_options.size > 1
end

def validate_es_for_esql_support!
return unless @response_type == 'esql'
# make sure connected ES supports ES|QL (8.11+)
es_supports_esql = Gem::Version.create(es_version) >= Gem::Version.create(ES_ESQL_SUPPORT_VERSION)
fail("Connected Elasticsearch #{es_version} version does not supports ES|QL. ES|QL feature requires at least Elasticsearch #{ES_ESQL_SUPPORT_VERSION} version.") unless es_supports_esql
end

module URIOrEmptyValidator
##
# @override to provide :uri_or_empty validator
Expand Down
83 changes: 83 additions & 0 deletions lib/logstash/inputs/elasticsearch/esql.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
require 'logstash/helpers/loggable_try'

module LogStash
module Inputs
class Elasticsearch
class Esql
include LogStash::Util::Loggable

ESQL_JOB = "ES|QL job"

# Initialize the ESQL query executor
# @param client [Elasticsearch::Client] The Elasticsearch client instance
# @param plugin [LogStash::Inputs::Elasticsearch] The parent plugin instance
def initialize(client, plugin)
@client = client
@plugin = plugin
@retries = plugin.params["retries"]

@query = plugin.params["query"]
unless @query.include?('METADATA')
logger.warn("The query doesn't have METADATA keyword. Including it makes _id and _version available in the documents", {:query => @query})
end
end

# Execute the ESQL query and process results
# @param output_queue [Queue] The queue to push processed events to
# @param query A query (to obey interface definition)
def do_run(output_queue, query)
logger.info("ES|QL executor starting")
response = retryable(ESQL_JOB) do
@client.esql.query({ body: { query: @query }, format: 'json' })
end
# retriable already printed error details
return if response == false

if response&.headers&.dig("warning")
Copy link
Contributor Author

@mashhurs mashhurs Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review note: interestingly warnings in response headers.

logger.warn("ES|QL executor received warning", {:message => response.headers["warning"]})
end
if response['values'] && response['columns']
process_response(response['values'], response['columns'], output_queue)
end
end

# Execute a retryable operation with proper error handling
# @param job_name [String] Name of the job for logging purposes
# @yield The block to execute
# @return [Boolean] true if successful, false otherwise
def retryable(job_name, &block)
stud_try = ::LogStash::Helpers::LoggableTry.new(logger, job_name)
stud_try.try((@retries + 1).times) { yield }
rescue => e
error_details = {:message => e.message, :cause => e.cause}
error_details[:backtrace] = e.backtrace if logger.debug?
logger.error("#{job_name} failed with ", error_details)
false
end

private

# Process the ESQL response and push events to the output queue
# @param values [Array[Array]] The ESQL query response hits
# @param columns [Array[Hash]] The ESQL query response columns
# @param output_queue [Queue] The queue to push processed events to
def process_response(values, columns, output_queue)
values.each do |value|
mapped_data = map_column_and_values(columns, value)
@plugin.decorate_and_push_to_queue(output_queue, mapped_data)
end
end

# Map column names to their corresponding values
# @param columns [Array] Array of column definitions
# @param values [Array] Array of values for the current row
# @return [Hash] Mapped data with column names as keys
def map_column_and_values(columns, values)
columns.each_with_index.with_object({}) do |(column, index), mapped_data|
mapped_data[column["name"]] = values[index]
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion logstash-input-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-elasticsearch'
s.version = '5.1.0'
s.version = '5.2.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads query results from an Elasticsearch cluster"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
Loading