Skip to content

Commit 9465260

Browse files
separate common methods into elasticsearch specific and shared methods (logstash-plugins#976)
specific elasticsearch methods are now into the main elasticsearch class file and shared methods have been moved into the PluginMixins::ElasticSearch::Common namespace. The license checking code has been externalized and can now be specified as argument to the build_client method. This is code refactorting that has no end-user impact.
1 parent 889a16e commit 9465260

File tree

11 files changed

+380
-241
lines changed

11 files changed

+380
-241
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## unreleased
22
- Refactored configuration options into specific and shared in PluginMixins namespace [#973](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/973)
3+
- Refactored common methods into specific and shared in PluginMixins namespace [#976](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/976)
34

45
## 10.7.3
56
- Added composable index template support for elasticsearch version 8 [#980](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/980)

lib/logstash/outputs/elasticsearch.rb

+174-37
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,16 @@
8686
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
8787
declare_threadsafe!
8888

89+
require "logstash/outputs/elasticsearch/license_checker"
8990
require "logstash/outputs/elasticsearch/http_client"
9091
require "logstash/outputs/elasticsearch/http_client_builder"
9192
require "logstash/plugin_mixins/elasticsearch/api_configs"
92-
require "logstash/outputs/elasticsearch/common"
93+
require "logstash/plugin_mixins/elasticsearch/common"
9394
require "logstash/outputs/elasticsearch/ilm"
9495
require 'logstash/plugin_mixins/ecs_compatibility_support'
9596

9697
# Protocol agnostic methods
97-
include(LogStash::Outputs::ElasticSearch::Common)
98+
include(LogStash::PluginMixins::ElasticSearch::Common)
9899

99100
# Methods for ILM support
100101
include(LogStash::Outputs::ElasticSearch::Ilm)
@@ -246,33 +247,37 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
246247
# ILM policy to use, if undefined the default policy will be used.
247248
config :ilm_policy, :validate => :string, :default => DEFAULT_POLICY
248249

250+
attr_reader :default_index
251+
attr_reader :default_ilm_rollover_alias
252+
attr_reader :default_template_name
253+
249254
def initialize(*params)
250255
super
251256
setup_ecs_compatibility_related_defaults
252257
end
253258

254-
def setup_ecs_compatibility_related_defaults
255-
case ecs_compatibility
256-
when :disabled
257-
@default_index = "logstash-%{+yyyy.MM.dd}"
258-
@default_ilm_rollover_alias = "logstash"
259-
@default_template_name = 'logstash'
260-
when :v1
261-
@default_index = "ecs-logstash-%{+yyyy.MM.dd}"
262-
@default_ilm_rollover_alias = "ecs-logstash"
263-
@default_template_name = 'ecs-logstash'
264-
else
265-
fail("unsupported ECS Compatibility `#{ecs_compatibility}`")
266-
end
259+
def register
260+
@template_installed = Concurrent::AtomicBoolean.new(false)
261+
@stopping = Concurrent::AtomicBoolean.new(false)
262+
# To support BWC, we check if DLQ exists in core (< 5.4). If it doesn't, we use nil to resort to previous behavior.
263+
@dlq_writer = dlq_enabled? ? execution_context.dlq_writer : nil
267264

268-
@index ||= default_index
269-
@ilm_rollover_alias ||= default_ilm_rollover_alias
270-
@template_name ||= default_template_name
271-
end
265+
check_action_validity
272266

273-
attr_reader :default_index
274-
attr_reader :default_ilm_rollover_alias
275-
attr_reader :default_template_name
267+
# the license_checking behaviour in the Pool class is externalized in the LogStash::ElasticSearchOutputLicenseChecker
268+
# class defined in license_check.rb. This license checking is specific to the elasticsearch output here and passed
269+
# to build_client down to the Pool class.
270+
build_client(LicenseChecker.new(@logger))
271+
272+
@template_installer = setup_after_successful_connection do
273+
discover_cluster_uuid
274+
install_template
275+
setup_ilm if ilm_in_use?
276+
end
277+
@bulk_request_metrics = metric.namespace(:bulk_requests)
278+
@document_level_metrics = metric.namespace(:documents)
279+
@logger.info("New Elasticsearch output", :class => self.class.name, :hosts => @hosts.map(&:sanitized).map(&:to_s))
280+
end
276281

277282
# @override to handle proxy => '' as if none was set
278283
def config_init(params)
@@ -290,20 +295,12 @@ def config_init(params)
290295
super(params)
291296
end
292297

293-
def build_client
294-
# the following 3 options validation & setup methods are called inside build_client
295-
# because they must be executed prior to building the client and logstash
296-
# monitoring and management rely on directly calling build_client
297-
# see https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/934#pullrequestreview-396203307
298-
validate_authentication
299-
fill_hosts_from_cloud_id
300-
setup_hosts
301-
302-
params["metric"] = metric
303-
if @proxy.eql?('')
304-
@logger.warn "Supplied proxy setting (proxy => '') has no effect"
298+
# Receive an array of events and immediately attempt to index them (no buffering)
299+
def multi_receive(events)
300+
until @template_installed.true?
301+
sleep 1
305302
end
306-
@client ||= ::LogStash::Outputs::ElasticSearch::HttpClientBuilder.build(@logger, @hosts, params)
303+
retrying_submit(events.map {|e| event_action_tuple(e)})
307304
end
308305

309306
def close
@@ -312,8 +309,65 @@ def close
312309
@client.close if @client
313310
end
314311

315-
def self.oss?
316-
LogStash::OSS
312+
# not private because used by ILM specs
313+
def stop_template_installer
314+
@template_installer.join unless @template_installer.nil?
315+
end
316+
317+
# not private for elasticsearch_spec.rb
318+
# Convert the event into a 3-tuple of action, params, and event
319+
def event_action_tuple(event)
320+
action = event.sprintf(@action)
321+
322+
params = {
323+
:_id => @document_id ? event.sprintf(@document_id) : nil,
324+
:_index => event.sprintf(@index),
325+
routing_field_name => @routing ? event.sprintf(@routing) : nil
326+
}
327+
328+
params[:_type] = get_event_type(event) if use_event_type?(nil)
329+
330+
if @pipeline
331+
value = event.sprintf(@pipeline)
332+
# convention: empty string equates to not using a pipeline
333+
# this is useful when using a field reference in the pipeline setting, e.g.
334+
# elasticsearch {
335+
# pipeline => "%{[@metadata][pipeline]}"
336+
# }
337+
params[:pipeline] = value unless value.empty?
338+
end
339+
340+
if @parent
341+
if @join_field
342+
join_value = event.get(@join_field)
343+
parent_value = event.sprintf(@parent)
344+
event.set(@join_field, { "name" => join_value, "parent" => parent_value })
345+
params[routing_field_name] = event.sprintf(@parent)
346+
else
347+
params[:parent] = event.sprintf(@parent)
348+
end
349+
end
350+
351+
if action == 'update'
352+
params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @upsert != ""
353+
params[:_script] = event.sprintf(@script) if @script != ""
354+
params[retry_on_conflict_action_name] = @retry_on_conflict
355+
end
356+
357+
if @version
358+
params[:version] = event.sprintf(@version)
359+
end
360+
361+
if @version_type
362+
params[:version_type] = event.sprintf(@version_type)
363+
end
364+
365+
[action, params, event]
366+
end
367+
368+
# not private for elasticsearch_spec.rb
369+
def retry_on_conflict_action_name
370+
maximum_seen_major_version >= 7 ? :retry_on_conflict : :_retry_on_conflict
317371
end
318372

319373
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }
@@ -322,4 +376,87 @@ def self.oss?
322376
name = plugin.name.split('-')[-1]
323377
require "logstash/outputs/elasticsearch/#{name}"
324378
end
379+
380+
private
381+
382+
def routing_field_name
383+
maximum_seen_major_version >= 6 ? :routing : :_routing
384+
end
385+
386+
# Determine the correct value for the 'type' field for the given event
387+
DEFAULT_EVENT_TYPE_ES6="doc".freeze
388+
DEFAULT_EVENT_TYPE_ES7="_doc".freeze
389+
def get_event_type(event)
390+
# Set the 'type' value for the index.
391+
type = if @document_type
392+
event.sprintf(@document_type)
393+
else
394+
if maximum_seen_major_version < 6
395+
event.get("type") || DEFAULT_EVENT_TYPE_ES6
396+
elsif maximum_seen_major_version == 6
397+
DEFAULT_EVENT_TYPE_ES6
398+
elsif maximum_seen_major_version == 7
399+
DEFAULT_EVENT_TYPE_ES7
400+
else
401+
nil
402+
end
403+
end
404+
405+
if !(type.is_a?(String) || type.is_a?(Numeric))
406+
@logger.warn("Bad event type! Non-string/integer type value set!", :type_class => type.class, :type_value => type.to_s, :event => event)
407+
end
408+
409+
type.to_s
410+
end
411+
412+
##
413+
# WARNING: This method is overridden in a subclass in Logstash Core 7.7-7.8's monitoring,
414+
# where a `client` argument is both required and ignored. In later versions of
415+
# Logstash Core it is optional and ignored, but to make it optional here would
416+
# allow us to accidentally break compatibility with Logstashes where it was required.
417+
# @param noop_required_client [nil]: required `nil` for legacy reasons.
418+
# @return [Boolean]
419+
def use_event_type?(noop_required_client)
420+
maximum_seen_major_version < 8
421+
end
422+
423+
def install_template
424+
TemplateManager.install_template(self)
425+
@template_installed.make_true
426+
end
427+
428+
def setup_ecs_compatibility_related_defaults
429+
case ecs_compatibility
430+
when :disabled
431+
@default_index = "logstash-%{+yyyy.MM.dd}"
432+
@default_ilm_rollover_alias = "logstash"
433+
@default_template_name = 'logstash'
434+
when :v1
435+
@default_index = "ecs-logstash-%{+yyyy.MM.dd}"
436+
@default_ilm_rollover_alias = "ecs-logstash"
437+
@default_template_name = 'ecs-logstash'
438+
else
439+
fail("unsupported ECS Compatibility `#{ecs_compatibility}`")
440+
end
441+
442+
@index ||= default_index
443+
@ilm_rollover_alias ||= default_ilm_rollover_alias
444+
@template_name ||= default_template_name
445+
end
446+
447+
# To be overidden by the -java version
448+
VALID_HTTP_ACTIONS=["index", "delete", "create", "update"]
449+
def valid_actions
450+
VALID_HTTP_ACTIONS
451+
end
452+
453+
def check_action_validity
454+
raise LogStash::ConfigurationError, "No action specified!" unless @action
455+
456+
# If we're using string interpolation, we're good!
457+
return if @action =~ /%{.+}/
458+
return if valid_actions.include?(@action)
459+
460+
raise LogStash::ConfigurationError, "Action '#{@action}' is invalid! Pick one of #{valid_actions} or use a sprintf style statement"
461+
end
325462
end

lib/logstash/outputs/elasticsearch/http_client.rb

+1
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,7 @@ def build_pool(options)
286286
adapter = build_adapter(options)
287287

288288
pool_options = {
289+
:license_checker => options[:license_checker],
289290
:sniffing => sniffing,
290291
:sniffer_delay => options[:sniffer_delay],
291292
:sniffing_path => options[:sniffing_path],

lib/logstash/outputs/elasticsearch/http_client/pool.rb

+13-28
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
require "logstash/plugin_mixins/elasticsearch/noop_license_checker"
2+
13
module LogStash; module Outputs; class ElasticSearch; class HttpClient;
24
class Pool
35
class NoConnectionAvailableError < Error; end
@@ -29,6 +31,7 @@ def message
2931
end
3032

3133
attr_reader :logger, :adapter, :sniffing, :sniffer_delay, :resurrect_delay, :healthcheck_path, :sniffing_path, :bulk_path
34+
attr_reader :license_checker # license_checker is used by the pool specs
3235

3336
ROOT_URI_PATH = '/'.freeze
3437
LICENSE_PATH = '/_license'.freeze
@@ -66,12 +69,10 @@ def initialize(logger, adapter, initial_urls=[], options={})
6669
# Holds metadata about all URLs
6770
@url_info = {}
6871
@stopping = false
69-
end
7072

71-
def oss?
72-
LogStash::Outputs::ElasticSearch.oss?
73+
@license_checker = options.fetch(:license_checker) { LogStash::PluginMixins::ElasticSearch::NoopLicenseChecker::INSTANCE }
7374
end
74-
75+
7576
def start
7677
update_initial_urls
7778
start_resurrectionist
@@ -210,7 +211,6 @@ def address_str_to_uri(addr_str)
210211
end
211212
end
212213

213-
214214
def sniff_2x_1x(nodes)
215215
nodes.map do |id,info|
216216
# TODO Make sure this works with shield. Does that listed
@@ -245,16 +245,15 @@ def start_resurrectionist
245245
end
246246
end
247247

248+
# Retrieve ES node license information
249+
# @param url [LogStash::Util::SafeURI] ES node URL
250+
# @return [Hash] deserialized license document or empty Hash upon any error
248251
def get_license(url)
249252
response = perform_request_to_url(url, :get, LICENSE_PATH)
250253
LogStash::Json.load(response.body)
251-
end
252-
253-
def valid_es_license?(url)
254-
license = get_license(url)
255-
license.fetch("license", {}).fetch("status", nil) == "active"
256254
rescue => e
257-
false
255+
logger.error("Unable to get license information", url: url.sanitized.to_s, error_type: e.class, error: e.message)
256+
{}
258257
end
259258

260259
def health_check_request(url)
@@ -282,19 +281,9 @@ def healthcheck!
282281
@logger.warn("Detected a node with a higher major version than previously observed. This could be the result of an elasticsearch cluster upgrade.", :previous_major => @maximum_seen_major_version, :new_major => major, :node_url => url.sanitized.to_s)
283282
set_new_major_version(major)
284283
end
285-
if oss? || valid_es_license?(url)
286-
meta[:state] = :alive
287-
else
288-
# As this version is to be shipped with Logstash 7.x we won't mark the connection as unlicensed
289-
#
290-
# logger.error("Cannot connect to the Elasticsearch cluster configured in the Elasticsearch output. Logstash requires the default distribution of Elasticsearch. Please update to the default distribution of Elasticsearch for full access to all free features, or switch to the OSS distribution of Logstash.", :url => url.sanitized.to_s)
291-
# meta[:state] = :unlicensed
292-
#
293-
# Instead we'll log a deprecation warning and mark it as alive:
294-
#
295-
log_license_deprecation_warn(url)
296-
meta[:state] = :alive
297-
end
284+
285+
alive = @license_checker.appropriate_license?(self, url)
286+
meta[:state] = alive ? :alive : :dead
298287
end
299288
rescue HostUnreachableError, BadResponseCodeError => e
300289
logger.warn("Attempted to resurrect connection to dead ES instance, but got an error.", url: url.sanitized.to_s, error_type: e.class, error: e.message)
@@ -306,10 +295,6 @@ def stop_resurrectionist
306295
@resurrectionist.join if @resurrectionist
307296
end
308297

309-
def log_license_deprecation_warn(url)
310-
logger.warn("DEPRECATION WARNING: Connecting to an OSS distribution of Elasticsearch using the default distribution of Logstash will stop working in Logstash 8.0.0. Please upgrade to the default distribution of Elasticsearch, or use the OSS distribution of Logstash", :url => url.sanitized.to_s)
311-
end
312-
313298
def resurrectionist_alive?
314299
@resurrectionist ? @resurrectionist.alive? : nil
315300
end

lib/logstash/outputs/elasticsearch/http_client_builder.rb

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ def self.build(logger, hosts, params)
1515
client_settings[:proxy] = params["proxy"] if params["proxy"]
1616

1717
common_options = {
18+
:license_checker => params["license_checker"],
1819
:client_settings => client_settings,
1920
:metric => params["metric"],
2021
:resurrect_delay => params["resurrect_delay"]

0 commit comments

Comments
 (0)