diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 97ee24024..24aa6a8e0 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -17,7 +17,7 @@ # called `http.content_type.required`. If this option is set to `true`, and you # are using Logstash 2.4 through 5.2, you need to update the Elasticsearch output # plugin to version 6.2.5 or higher. -# +# # ================================================================================ # # This plugin is the recommended method of storing logs in Elasticsearch. @@ -26,8 +26,8 @@ # This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with Elasticsearch as of Logstash 2.0. # We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower, # yet far easier to administer and work with. When using the HTTP protocol one may upgrade Elasticsearch versions without having -# to upgrade Logstash in lock-step. -# +# to upgrade Logstash in lock-step. +# # You can learn more about Elasticsearch at # # ==== Template management for Elasticsearch 5.x @@ -75,12 +75,12 @@ # # ==== HTTP Compression # -# This plugin supports request and response compression. Response compression is enabled by default and -# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for -# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in +# This plugin supports request and response compression. Response compression is enabled by default and +# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for +# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in # Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin # -# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` +# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` # setting in their Logstash config file. # class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base diff --git a/lib/logstash/outputs/elasticsearch/common.rb b/lib/logstash/outputs/elasticsearch/common.rb index 9d6c68c7e..b89d8f2ff 100644 --- a/lib/logstash/outputs/elasticsearch/common.rb +++ b/lib/logstash/outputs/elasticsearch/common.rb @@ -20,6 +20,7 @@ def register @stopping = Concurrent::AtomicBoolean.new(false) # To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior. @dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil + @ilm_seen_aliases = {} setup_hosts # properly sets @hosts build_client @@ -27,6 +28,16 @@ def register check_action_validity @bulk_request_metrics = metric.namespace(:bulk_requests) @document_level_metrics = metric.namespace(:documents) + + # Can only be used if ILM not in use + if @ro_only_enabled && ilm_in_use? + @logger.debug("Preemptively creating rollover aliases, not adding ILM aliases/policies/settings in the process") + end + + if @ro_only_enabled || ilm_in_use? + @logger.info("Caching seen/created aliases #{@ilm_cache_once ? 'once' : 'every bulk'}") + end + @logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s)) end @@ -166,6 +177,33 @@ def retrying_submit(actions) # We retry with whatever is didn't succeed begin + + # Create alias(es) before bulking, ensuring rollover aliases exist. + # + # Using the hash let's us set improper aliases such as unsubstituted fields like + # %{abc} to the default alias instead, then return immediately when found to avoid + # raising the exception multiple times, as the exceptions are typically more expensive. + if ilm_in_use? && !@ilm_event_alias.nil? + created_aliases = @ilm_cache_once ? @ilm_seen_aliases : {} + submit_actions.each do |action, params, event| + if ['index', 'create'].include?(action) + begin + alias_name, new_index = maybe_create_rollover_alias_for_event(event, created_aliases, !@ro_only_enabled) + created_aliases[alias_name] = new_index + params[:_index] = new_index + rescue ::LogStash::Outputs::ElasticSearch::Ilm::ImproperAliasName => e + @logger.warn("Event alias name #{e.name} is not proper, using #{@ilm_rollover_alias} instead") + created_aliases[e.name] = @ilm_rollover_alias + params[:_index] = @ilm_rollover_alias + rescue => e + @logger.warn("Unknown error on creating event alias, #{e}, using #{@ilm_rollover_alias} instead") + params[:_index] = @ilm_rollover_alias + end + end + end + @logger.trace("Created/cached aliases: #{created_aliases.to_s}") + end + submit_actions = submit(submit_actions) if submit_actions && submit_actions.size > 0 @logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request.", :count => submit_actions.size) diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index c449ca665..4e2975089 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -17,8 +17,8 @@ def self.included(mod) # Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here]. mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME - mod.config :document_type, - :validate => :string, + mod.config :document_type, + :validate => :string, :deprecated => "Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature" # From Logstash 1.3 onwards, a template is applied to Elasticsearch during @@ -67,7 +67,7 @@ def self.included(mod) # The version to use for indexing. Use sprintf syntax like `%{my_version}` to use a field value here. # See https://www.elastic.co/blog/elasticsearch-versioning-support. mod.config :version, :validate => :string - + # The version_type to use for indexing. # See https://www.elastic.co/blog/elasticsearch-versioning-support. # See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types @@ -93,7 +93,7 @@ def self.included(mod) # `["https://127.0.0.1:9200/mypath"]` (If using a proxy on a subpath) # It is important to exclude http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html[dedicated master nodes] from the `hosts` list # to prevent LS from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes in Elasticsearch. - # + # # Any special characters present in the URLs here MUST be URL escaped! This means `#` should be put in as `%23` for instance. mod.config :hosts, :validate => :uri, :default => [::LogStash::Util::SafeURI.new("//127.0.0.1")], :list => true @@ -155,6 +155,31 @@ def self.included(mod) # ILM policy to use, if undefined the default policy will be used. mod.config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY + # Add index.lifecycle.rollover_alias setting to alias upon creation, always done in the case of aliases created via the ilm_event_alias setting. + mod.config :ilm_set_rollover_alias, :validate => [true, false, 'true', 'false'], :default => false + + # Use this for event substitution aliases, ilm_rollover_alias is used as a default in the event the field is missing or other unknown errors. + # Substituion syntax will work here, e.g. "%{my_fantastic_field_here}-alias". + mod.config :ilm_event_alias, :validate => :string + + # Cache seen/created aliases and their destinations once, not during each bulk batch. + mod.config :ilm_cache_once, :validate => [true, false, 'true', 'false'], :default => false + + + # ----- + # Rollover Alias Creation + # ----- + + # Reuses ilm_event_alias, ilm_pattern, ilm_cache_once as only ILM or RO can be used at one time. Preemptive RO creation + # follows all the same processes that the ILM tweaks does, however, it does not insert the lifecycle alias setting or policy. + # It is understood that this to bootstrap the rollover alias and index before indexing so that external tools can manage + # the actual rollover process, while ILM can manage the other lifecycle phases. + # + # Just like the ILM event substitution process, if anything does not match, it is thrown into the default alias set via + # ilm_rollover_alias. + + # Flag for enabling Rollover alias creation, overrides ILM behavior. + mod.config :ro_only_enabled, :validate => [true, false, 'true', 'false'], :default => false end end end end end diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 32a37e82a..4a2c4785b 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -30,7 +30,6 @@ class HttpClient # :setting => value # } -# # The `options` is a hash where the following symbol keys have meaning: # # * `:hosts` - array of String. Set a list of hosts to use for communication. @@ -110,7 +109,7 @@ def bulk(actions) if http_compression body_stream.set_encoding "BINARY" stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY) - else + else stream_writer = body_stream end bulk_responses = [] @@ -215,7 +214,7 @@ def scheme else nil end - + calculated_scheme = calculate_property(uris, :scheme, explicit_scheme, sniffing) if calculated_scheme && calculated_scheme !~ /https?/ @@ -235,7 +234,7 @@ def port # Enter things like foo:123, bar and wind up with foo:123, bar:9200 calculate_property(uris, :port, nil, sniffing) || 9200 end - + def uris @options[:hosts] end @@ -254,7 +253,7 @@ def http_compression def build_adapter(options) timeout = options[:timeout] || 0 - + adapter_options = { :socket_timeout => timeout, :request_timeout => timeout, @@ -281,7 +280,7 @@ def build_adapter(options) adapter_class = ::LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter adapter = adapter_class.new(@logger, adapter_options) end - + def build_pool(options) adapter = build_adapter(options) @@ -331,7 +330,7 @@ def host_to_url(h) h.query end prefixed_raw_query = raw_query && !raw_query.empty? ? "?#{raw_query}" : nil - + raw_url = "#{raw_scheme}://#{postfixed_userinfo}#{raw_host}:#{raw_port}#{prefixed_raw_path}#{prefixed_raw_query}" ::LogStash::Util::SafeURI.new(raw_url) @@ -360,9 +359,17 @@ def rollover_alias_exists?(name) end # Create a new rollover alias - def rollover_alias_put(alias_name, alias_definition) - logger.info("Creating rollover alias #{alias_name}") + def rollover_alias_put(alias_name, alias_definition, add_rollover_settings) begin + if add_rollover_settings == true + real_alias_name, _ = alias_definition["aliases"].first + logger.debug("Adding lifecycle rollover_alias setting for #{alias_name} => #{real_alias_name}, add rollover was #{add_rollover_settings}") + alias_definition.merge!({ + 'settings' => { + 'index.lifecycle.rollover_alias' => real_alias_name + } + }) + end @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) # If the rollover alias already exists, ignore the error that comes back from Elasticsearch rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index 53c62f36a..5d4cfa99e 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -3,7 +3,7 @@ module LogStash; module Outputs; class ElasticSearch; class HttpClient; DEFAULT_HEADERS = { "Content-Type" => "application/json" } - + class ManticoreAdapter attr_reader :manticore, :logger @@ -18,14 +18,14 @@ def initialize(logger, options={}) options[:cookies] = false @client_params = {:headers => DEFAULT_HEADERS.merge(options[:headers] || {})} - + if options[:proxy] options[:proxy] = manticore_proxy_hash(options[:proxy]) end - + @manticore = ::Manticore::Client.new(options) end - + # Transform the proxy option to a hash. Manticore's support for non-hash # proxy options is broken. This was fixed in https://github.com/cheald/manticore/commit/34a00cee57a56148629ed0a47c329181e7319af5 # but this is not yet released @@ -55,12 +55,12 @@ def perform_request(url, method, path, params={}, body=nil) params[:body] = body if body if url.user - params[:auth] = { + params[:auth] = { :user => CGI.unescape(url.user), # We have to unescape the password here since manticore won't do it # for us unless its part of the URL - :password => CGI.unescape(url.password), - :eager => true + :password => CGI.unescape(url.password), + :eager => true } end @@ -86,7 +86,7 @@ def perform_request(url, method, path, params={}, body=nil) # Returned urls from this method should be checked for double escaping. def format_url(url, path_and_query=nil) request_uri = url.clone - + # We excise auth info from the URL in case manticore itself tries to stick # sensitive data in a thrown exception or log data request_uri.user = nil @@ -102,7 +102,7 @@ def format_url(url, path_and_query=nil) new_query_parts = [request_uri.query, parsed_path_and_query.query].select do |part| part && !part.empty? # Skip empty nil and "" end - + request_uri.query = new_query_parts.join("&") unless new_query_parts.empty? # use `raw_path`` as `path` will unescape any escaped '/' in the path diff --git a/lib/logstash/outputs/elasticsearch/ilm.rb b/lib/logstash/outputs/elasticsearch/ilm.rb index a9f43f12a..1cb97f1a7 100644 --- a/lib/logstash/outputs/elasticsearch/ilm.rb +++ b/lib/logstash/outputs/elasticsearch/ilm.rb @@ -87,8 +87,41 @@ def maybe_create_ilm_policy end end + class ImproperAliasName < StandardError + attr_reader :name + def initialize(msg="Index not proper", name) + @name = name + super(msg) + end + end + + def maybe_create_rollover_alias_for_event(event, created_aliases, is_ilm_request) + alias_name = event.sprintf(ilm_event_alias) + return alias_name, created_aliases[alias_name] if created_aliases.has_key?(alias_name) + improper = alias_name == ilm_event_alias + alias_name = improper ? ilm_rollover_alias : alias_name + alias_target = "<#{alias_name}-#{ilm_pattern}>" + alias_payload = { + 'aliases' => { + alias_name => { + 'is_write_index' => true + } + } + } + # Without placing the settings on the index you'll need something to run by and add this + # afterwards (or by a template) or the first index will never rollover. + do_ilm_request = is_ilm_request ? ilm_set_rollover_alias : false + logger.trace("Putting rollover alias #{alias_target} for #{alias_name}, is this an ILM request?: #{is_ilm_request}, will we set the rollover alias setting? #{do_ilm_request}") + client.rollover_alias_put(alias_target, alias_payload, do_ilm_request) unless client.rollover_alias_exists?(alias_name) + + # Raise this afterwards, so we can store this properly as a broken alias + raise ImproperAliasName.new(name=event.sprintf(ilm_event_alias)) if improper + + return alias_name, alias_name + end + def maybe_create_rollover_alias - client.rollover_alias_put(rollover_alias_target, rollover_alias_payload) unless client.rollover_alias_exists?(ilm_rollover_alias) + client.rollover_alias_put(rollover_alias_target, rollover_alias_payload, ilm_set_rollover_alias) unless client.rollover_alias_exists?(ilm_rollover_alias) end def rollover_alias_target @@ -98,7 +131,7 @@ def rollover_alias_target def rollover_alias_payload { 'aliases' => { - ilm_rollover_alias =>{ + ilm_rollover_alias => { 'is_write_index' => true } } @@ -110,4 +143,4 @@ def policy_payload LogStash::Json.load(::IO.read(policy_path)) end end - end end end \ No newline at end of file + end end end