From b0ab99a4beb2587c04b6ef0f3fbb201e656889b6 Mon Sep 17 00:00:00 2001 From: Pierre Deman Date: Fri, 12 Jul 2024 11:32:59 +0200 Subject: [PATCH 1/2] Add firehose output plugin and spec --- lib/logstash/outputs/firehose.rb | 143 +++++++++++++++++++++++++++++++ logstash-integration-aws.gemspec | 5 +- spec/outputs/firehose_spec.rb | 111 ++++++++++++++++++++++++ 3 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 lib/logstash/outputs/firehose.rb create mode 100644 spec/outputs/firehose_spec.rb diff --git a/lib/logstash/outputs/firehose.rb b/lib/logstash/outputs/firehose.rb new file mode 100644 index 0000000..504367a --- /dev/null +++ b/lib/logstash/outputs/firehose.rb @@ -0,0 +1,143 @@ +require "logstash/outputs/base" +require "logstash/namespace" +require 'logstash/plugin_mixins/aws_config' + +require "aws-sdk-firehose" + +# Push events to an Amazon Web Services (AWS) Data Firehose. +# +# Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations +# such as Amazon services or HTTP endpoints owned by supported third-party service providers. +# See : https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html +# +# This plugin use the AWS SDK to send data to the Firehose stream. +# See https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html#writing-with-sdk +# +# Your identity must have the following permissions on the stream: +# * `firehose:PutRecordBatch` +# +# ==== Batch Publishing +# This output publishes messages to Firehose in batches in order to optimize event throughput and increase performance. +# This is done using the `PutRecordBatch` API. +# See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html +# +# When publishing messages to Firehose in batches, the following service limits must be respected : +# * Each PutRecordBatch request supports up to 500 records. +# * Each record in the request can be as large as 1,000 KB. +# * All records in the request can be as large as 4 MB. +# +# This plugin will dynamically adjust the size of the batch published to Firehose in +# order to ensure that the total payload size does not exceed the limits. +# +class LogStash::Outputs::Firehose < LogStash::Outputs::Base + include LogStash::PluginMixins::AwsConfig::V2 + + RECORDS_MAX_BATCH_COUNT = 500 + RECORD_MAX_SIZE_BYTES = 1_000_000 + RECORD_TOTAL_MAX_SIZE_BYTES = 4_000_000 + REQUEST_RETRY_INTERVAL_SECONDS = 2 + + config_name "firehose" + concurrency :shared + default :codec, 'json' + + # The name of the delivery stream. + # Note that this is just the name of the stream, not the URL or ARN. + config :delivery_stream_name, :validate => :string, :required => true + + # The maximum number of records to be sent in each batch. + config :batch_max_count, :validate => :number, :default => RECORDS_MAX_BATCH_COUNT + + # The maximum number of bytes for any record sent to Firehose. + # Messages exceeding this size will be dropped. + # See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html + config :record_max_size_bytes, :validate => :bytes, :default => RECORD_MAX_SIZE_BYTES + + # The maximum number of bytes for all records sent to Firehose. + # See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html + config :record_total_max_size_bytes, :validate => :bytes, :default => RECORD_TOTAL_MAX_SIZE_BYTES + + def register + if @batch_max_count > RECORDS_MAX_BATCH_COUNT + raise LogStash::ConfigurationError, "The maximum batch size is #{RECORDS_MAX_BATCH_COUNT} records" + elsif @batch_max_count < 1 + raise LogStash::ConfigurationError, 'The batch size must be greater than 0' + end + + if @record_max_size_bytes > RECORD_MAX_SIZE_BYTES + raise LogStash::ConfigurationError, "The maximum record size is #{RECORD_MAX_SIZE_BYTES}" + elsif @record_max_size_bytes < 1 + raise LogStash::ConfigurationError, 'The record size must be greater than 0' + end + + if @record_total_max_size_bytes > RECORD_TOTAL_MAX_SIZE_BYTES + raise LogStash::ConfigurationError, "The maximum message size is #{RECORD_TOTAL_MAX_SIZE_BYTES}" + elsif @record_total_max_size_bytes < 1 + raise LogStash::ConfigurationError, 'The message size must be greater than 0' + end + + @logger.info("New Firehose output", :delivery_stream_name => @delivery_stream_name, + :batch_max_count => @batch_max_count, + :record_max_size_bytes => @record_max_size_bytes, + :record_total_max_size_bytes => @record_total_max_size_bytes) + @firehose = Aws::Firehose::Client.new(aws_options_hash) + end + + public def multi_receive_encoded(encoded_events) + return if encoded_events.empty? + + @logger.debug("Multi receive encoded", :encoded_events => encoded_events) + + records_bytes = 0 + records = [] + + encoded_events.each do |_, encoded| + + if encoded.bytesize > @record_max_size_bytes + @logger.warn('Record exceeds maximum length and will be dropped', :record => encoded, :size => encoded.bytesize) + next + end + + if records.size >= @batch_max_count or (records_bytes + encoded.bytesize) > @record_total_max_size_bytes + put_record_batch(records) + records_bytes = 0 + records = [] + end + + records_bytes += encoded.bytesize + records << { :data => encoded } + end + + put_record_batch(records) unless records.empty? + end + + def put_record_batch(records) + return if records.nil? or records.empty? + + @logger.debug("Publishing records", :batch => records.size) + + begin + put_response = @firehose.put_record_batch({ + delivery_stream_name: @delivery_stream_name, + records: records + }) + rescue => e + @logger.error("Encountered an unexpected error submitting a batch request, will retry", + message: e.message, exception: e.class, backtrace: e.backtrace) + Stud.stoppable_sleep(REQUEST_RETRY_INTERVAL_SECONDS) + retry + end + + if put_response.failed_put_count == 0 + @logger.debug("Published records successfully", :batch => records.size) + return + end + + put_response.request_responses + .filter { |r| !r.error_code.nil? } + .each do |response| + @logger.warn('Record publish error, will be dropped', :response => response) + end unless put_response.request_responses.nil? + + end +end diff --git a/logstash-integration-aws.gemspec b/logstash-integration-aws.gemspec index fa1076b..facafa8 100644 --- a/logstash-integration-aws.gemspec +++ b/logstash-integration-aws.gemspec @@ -22,7 +22,8 @@ Gem::Specification.new do |s| logstash-output-cloudwatch logstash-output-s3 logstash-output-sns - logstash-output-sqs).join(",") + logstash-output-sqs + logstash-output-firehose).join(",") } @@ -45,6 +46,7 @@ Gem::Specification.new do |s| s.add_runtime_dependency "aws-sdk-cloudwatch" s.add_runtime_dependency "aws-sdk-cloudfront" s.add_runtime_dependency "aws-sdk-resourcegroups" + s.add_runtime_dependency "aws-sdk-firehose" s.add_development_dependency "logstash-codec-json_lines" s.add_development_dependency "logstash-codec-multiline" @@ -52,5 +54,6 @@ Gem::Specification.new do |s| s.add_development_dependency "logstash-codec-line" s.add_development_dependency "logstash-devutils" s.add_development_dependency "logstash-input-generator" + s.add_development_dependency "logstash-mixin-ecs_compatibility_support" s.add_development_dependency "timecop" end diff --git a/spec/outputs/firehose_spec.rb b/spec/outputs/firehose_spec.rb new file mode 100644 index 0000000..df313bf --- /dev/null +++ b/spec/outputs/firehose_spec.rb @@ -0,0 +1,111 @@ +require 'logstash/outputs/firehose' + +describe LogStash::Outputs::Firehose do + + let(:configuration) { { "delivery_stream_name" => "test" } } + let(:output) { LogStash::Plugin.lookup("output", "firehose").new(configuration) } + + describe "#register" do + + context "when no delivery stream specified" do + let(:configuration) { {} } + it "the method fails with error" do + expect { output.register }.to raise_error(LogStash::ConfigurationError) + end + end + + context "when batch max count out of bounds" do + [0, LogStash::Outputs::Firehose::RECORDS_MAX_BATCH_COUNT + 1].each do |batch_max_count| + let(:configuration) { { "delivery_stream_name" => "test", "batch_max_count" => batch_max_count } } + it "the method fails with error" do + expect { output.register }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context "when record max size out of bounds" do + [0, LogStash::Outputs::Firehose::RECORD_MAX_SIZE_BYTES + 1].each do |record_max_size_bytes| + let(:configuration) { { "delivery_stream_name" => "test", "record_max_size_bytes" => record_max_size_bytes } } + it "the method fails with error" do + expect { output.register }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context "when record total max size out of bounds" do + [0, LogStash::Outputs::Firehose::RECORD_TOTAL_MAX_SIZE_BYTES + 1].each do |record_total_max_size_bytes| + let(:configuration) { { "delivery_stream_name" => "test", "record_total_max_size_bytes" => record_total_max_size_bytes } } + it "the method fails with error" do + expect { output.register }.to raise_error(LogStash::ConfigurationError) + end + end + end + end + + describe "#multi_receive_encoded" do + + context "when records empty" do + it "does not push" do + expect(output).not_to receive(:put_record_batch) + output.multi_receive_encoded([]) + end + end + + context "when record too big" do + it "does not put" do + output.instance_variable_set(:@record_max_size_bytes, 1) + expect(output).not_to receive(:put_record_batch) + output.multi_receive_encoded([[nil, "{}"]]) + end + end + + context "when receive events" do + + event1 = "{one}" + event2 = "{two}" + event3 = "{three}" + + it "split batches by count" do + output.instance_variable_set(:@batch_max_count, 2) + expect(output).to receive(:put_record_batch).once.with([{ :data => event1 }, { :data => event2 }]) + expect(output).to receive(:put_record_batch).once.with([{ :data => event3 }]) + output.multi_receive_encoded([[nil, event1], [nil, event2], [nil, event3]]) + end + + it "split batches by size" do + output.instance_variable_set(:@record_total_max_size_bytes, event1.bytesize + event2.bytesize) + expect(output).to receive(:put_record_batch).once.with([{ :data => event1 }, { :data => event2 }]) + expect(output).to receive(:put_record_batch).once.with([{ :data => event3 }]) + output.multi_receive_encoded([[nil, event1], [nil, event2], [nil, event3]]) + end + end + + end + + describe "#put_record_batch" do + + let(:firehose_double) { instance_double(Aws::Firehose::Client) } + + before do + allow(Aws::Firehose::Client).to receive(:new).and_return(firehose_double) + output.register + end + + context "when records empty" do + it "does not push" do + expect(firehose_double).not_to receive(:put_record_batch) + output.put_record_batch([]) + end + end + + context "when firehose throw exception" do + it "retry" do + expect(firehose_double).to receive(:put_record_batch).twice.and_invoke( + proc { |_| raise RuntimeError.new('Put failed') }, + proc { |_| Aws::Firehose::Types::PutRecordBatchOutput.new } + ) + output.put_record_batch(["test_record"]) + end + end + end +end \ No newline at end of file From df5972e0e1a50fa0a8ffeaedc897477e89979a40 Mon Sep 17 00:00:00 2001 From: Pierre Deman Date: Wed, 17 Jul 2024 15:44:00 +0200 Subject: [PATCH 2/2] Add documentation --- docs/index.asciidoc | 1 + docs/output-firehose.asciidoc | 215 ++++++++++++++++++++++++++++++++++ 2 files changed, 216 insertions(+) create mode 100644 docs/output-firehose.asciidoc diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 58f2dc0..1f3a1c5 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -29,6 +29,7 @@ The AWS Integration Plugin provides integrated plugins for working with Amazon W * {logstash-ref}/plugins-inputs-s3.html[S3 Input Plugin] * {logstash-ref}/plugins-inputs-sqs.html[Sqs Input Plugin] * {logstash-ref}/plugins-outputs-cloudwatch.html[Cloudwatch Output Plugin] + * {logstash-ref}/plugins-outputs-firehose.html[Firehose Output Plugin] * {logstash-ref}/plugins-outputs-s3.html[S3 Output Plugin] * {logstash-ref}/plugins-outputs-sns.html[Sns Output Plugin] * {logstash-ref}/plugins-outputs-sqs.html[Sqs Output Plugin] diff --git a/docs/output-firehose.asciidoc b/docs/output-firehose.asciidoc new file mode 100644 index 0000000..aebac37 --- /dev/null +++ b/docs/output-firehose.asciidoc @@ -0,0 +1,215 @@ +:integration: aws +:plugin: firehose +:type: output +:default_codec: json + +/////////////////////////////////////////// +START - GENERATED VARIABLES, DO NOT EDIT! +/////////////////////////////////////////// +:version: %VERSION% +:release_date: %RELEASE_DATE% +:changelog_url: %CHANGELOG_URL% +:include_path: ../../../../logstash/docs/include +/////////////////////////////////////////// +END - GENERATED VARIABLES, DO NOT EDIT! +/////////////////////////////////////////// + +[id="plugins-{type}s-{plugin}"] +=== Firehose output plugin + +include::{include_path}/plugin_header-integration.asciidoc[] + +==== Description + +Push events to an Amazon Web Services (AWS) Data Firehose. + +Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon services or HTTP endpoints owned by supported third-party service providers. +See : https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html + +This plugin use the AWS SDK to send data to the Firehose stream. +See https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html#writing-with-sdk + +Your identity must have the following permissions on the stream: +* `firehose:PutRecordBatch` + +==== Batch Publishing + +This output publishes messages to Firehose in batches in order to optimize event throughput and increase performance. +This is done using the `PutRecordBatch` API. +See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html + +When publishing messages to Firehose in batches, the following service limits must be respected : +* Each PutRecordBatch request supports up to 500 records. +* Each record in the request can be as large as 1,000 KB. +* All records in the request can be as large as 4 MB. + +This plugin will dynamically adjust the size of the batch published to Firehose in order to ensure that the total payload size does not exceed the limits. + +[id="plugins-{type}s-{plugin}-options"] + +==== Firehose Output Configuration Options + +This plugin supports the following configuration options plus the <> described later. + +[cols="<,<,<",options="header",] +|======================================================================= +|Setting |Input type|Required +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +| <> |<>|No +|======================================================================= + +Also see <> for a list of options supported by all output plugins. + +  + +[id="plugins-{type}s-{plugin}-access_key_id"] +===== `access_key_id` + + * Value type is <> + * There is no default value for this setting. + +This plugin uses the AWS SDK and supports several ways to get credentials, which will be tried in this order: + +1. Static configuration, using `access_key_id` and `secret_access_key` params in logstash plugin config +2. External credentials file specified by `aws_credentials_file` +3. Environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` +4. Environment variables `AMAZON_ACCESS_KEY_ID` and `AMAZON_SECRET_ACCESS_KEY` +5. IAM Instance Profile (available when running inside EC2) + +[id="plugins-{type}s-{plugin}-aws_credentials_file"] +===== `aws_credentials_file` + + * Value type is <> + * There is no default value for this setting. + +Path to YAML file containing a hash of AWS credentials. +This file will only be loaded if `access_key_id` and +`secret_access_key` aren't set. +The contents of the file should look like this: + +[source,ruby] +---------------------------------- + :access_key_id: "12345" + :secret_access_key: "54321" +---------------------------------- + +[id="plugins-{type}s-{plugin}-batch_max_count"] +===== `batch_max_count` + +* Value type is <> +* Default value is `500` + +The maximum number of records to be sent in each batch. + +[id="plugins-{type}s-{plugin}-delivery_stream_name"] +===== `delivery_stream_name` + +* Value type is <> + +The name of the delivery stream. +Note that this is just the name of the stream, not the URL or ARN. + +[id="plugins-{type}s-{plugin}-endpoint"] +===== `endpoint` + + * Value type is <> + * There is no default value for this setting. + +The endpoint to connect to. +By default it is constructed using the value of `region`. +This is useful when connecting to S3 compatible services, but beware that these aren't guaranteed to work correctly with the AWS SDK. + +[id="plugins-{type}s-{plugin}-record_max_size_bytes"] +===== `record_max_size_bytes` + + * Value type is <> + * There is `1_000_000`. + +The maximum number of bytes for any record sent to Firehose. +Messages exceeding this size will be dropped. +See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html + +[id="plugins-{type}s-{plugin}-record_total_max_size_bytes"] +===== `record_total_max_size_bytes` + + * Value type is <> + * There is `4_000_000`. + +The maximum number of bytes for all records sent to Firehose. +See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html + +[id="plugins-{type}s-{plugin}-proxy_uri"] +===== `proxy_uri` + + * Value type is <> + * There is no default value for this setting. + +URI to proxy server if required + +[id="plugins-{type}s-{plugin}-region"] +===== `region` + + * Value type is <> + * Default value is `"us-east-1"` + +The AWS Region + +[id="plugins-{type}s-{plugin}-role_arn"] +===== `role_arn` + + * Value type is <> + * There is no default value for this setting. + +The AWS IAM Role to assume, if any. +This is used to generate temporary credentials, typically for cross-account access. +See the https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html[AssumeRole API documentation] for more information. + +[id="plugins-{type}s-{plugin}-role_session_name"] +===== `role_session_name` + + * Value type is <> + * Default value is `"logstash"` + +Session name to use when assuming an IAM role. + +[id="plugins-{type}s-{plugin}-secret_access_key"] +===== `secret_access_key` + + * Value type is <> + * There is no default value for this setting. + +The AWS Secret Access Key + +[id="plugins-{type}s-{plugin}-session_token"] +===== `session_token` + + * Value type is <> + * There is no default value for this setting. + +The AWS Session token for temporary credential + +[id="plugins-{type}s-{plugin}-use_aws_bundled_ca"] +===== `use_aws_bundled_ca` + +* Value type is <> +* Default value is `false` + +Use bundled CA certificates that ship with AWS SDK to verify SSL peer certificates. +For cases where the default certificates are unavailable, e.g. Windows, you can set this to `true`. + +[id="plugins-{type}s-{plugin}-common-options"] +include::{include_path}/{type}.asciidoc[] + +:default_codec!: