Skip to content

Add Firehose output #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The AWS Integration Plugin provides integrated plugins for working with Amazon W
* {logstash-ref}/plugins-inputs-s3.html[S3 Input Plugin]
* {logstash-ref}/plugins-inputs-sqs.html[Sqs Input Plugin]
* {logstash-ref}/plugins-outputs-cloudwatch.html[Cloudwatch Output Plugin]
* {logstash-ref}/plugins-outputs-firehose.html[Firehose Output Plugin]
* {logstash-ref}/plugins-outputs-s3.html[S3 Output Plugin]
* {logstash-ref}/plugins-outputs-sns.html[Sns Output Plugin]
* {logstash-ref}/plugins-outputs-sqs.html[Sqs Output Plugin]
Expand Down
215 changes: 215 additions & 0 deletions docs/output-firehose.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
:integration: aws
:plugin: firehose
:type: output
:default_codec: json

///////////////////////////////////////////
START - GENERATED VARIABLES, DO NOT EDIT!
///////////////////////////////////////////
:version: %VERSION%
:release_date: %RELEASE_DATE%
:changelog_url: %CHANGELOG_URL%
:include_path: ../../../../logstash/docs/include
///////////////////////////////////////////
END - GENERATED VARIABLES, DO NOT EDIT!
///////////////////////////////////////////

[id="plugins-{type}s-{plugin}"]
=== Firehose output plugin

include::{include_path}/plugin_header-integration.asciidoc[]

==== Description

Push events to an Amazon Web Services (AWS) Data Firehose.

Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon services or HTTP endpoints owned by supported third-party service providers.
See : https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html

This plugin use the AWS SDK to send data to the Firehose stream.
See https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html#writing-with-sdk

Your identity must have the following permissions on the stream:
* `firehose:PutRecordBatch`

==== Batch Publishing

This output publishes messages to Firehose in batches in order to optimize event throughput and increase performance.
This is done using the `PutRecordBatch` API.
See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html

When publishing messages to Firehose in batches, the following service limits must be respected :
* Each PutRecordBatch request supports up to 500 records.
* Each record in the request can be as large as 1,000 KB.
* All records in the request can be as large as 4 MB.

This plugin will dynamically adjust the size of the batch published to Firehose in order to ensure that the total payload size does not exceed the limits.

[id="plugins-{type}s-{plugin}-options"]

==== Firehose Output Configuration Options

This plugin supports the following configuration options plus the <<plugins-{type}s-{plugin}-common-options>> described later.

[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-access_key_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-aws_credentials_file>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-batch_max_count>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-delivery_stream_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-endpoint>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-proxy_uri>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-region>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-record_max_size_bytes>> |<<bytes,bytes>>|No
| <<plugins-{type}s-{plugin}-record_total_max_size_bytes>> |<<bytes,bytes>>|No
| <<plugins-{type}s-{plugin}-role_arn>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-role_session_name>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-secret_access_key>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-session_token>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-use_aws_bundled_ca>> |<<boolean,boolean>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all output plugins.

&nbsp;

[id="plugins-{type}s-{plugin}-access_key_id"]
===== `access_key_id`

* Value type is <<string,string>>
* There is no default value for this setting.

This plugin uses the AWS SDK and supports several ways to get credentials, which will be tried in this order:

1. Static configuration, using `access_key_id` and `secret_access_key` params in logstash plugin config
2. External credentials file specified by `aws_credentials_file`
3. Environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`
4. Environment variables `AMAZON_ACCESS_KEY_ID` and `AMAZON_SECRET_ACCESS_KEY`
5. IAM Instance Profile (available when running inside EC2)

[id="plugins-{type}s-{plugin}-aws_credentials_file"]
===== `aws_credentials_file`

* Value type is <<string,string>>
* There is no default value for this setting.

Path to YAML file containing a hash of AWS credentials.
This file will only be loaded if `access_key_id` and
`secret_access_key` aren't set.
The contents of the file should look like this:

[source,ruby]
----------------------------------
:access_key_id: "12345"
:secret_access_key: "54321"
----------------------------------

[id="plugins-{type}s-{plugin}-batch_max_count"]
===== `batch_max_count`

* Value type is <<number,number>>
* Default value is `500`

The maximum number of records to be sent in each batch.

[id="plugins-{type}s-{plugin}-delivery_stream_name"]
===== `delivery_stream_name`

* Value type is <<string,string>>

The name of the delivery stream.
Note that this is just the name of the stream, not the URL or ARN.

[id="plugins-{type}s-{plugin}-endpoint"]
===== `endpoint`

* Value type is <<string,string>>
* There is no default value for this setting.

The endpoint to connect to.
By default it is constructed using the value of `region`.
This is useful when connecting to S3 compatible services, but beware that these aren't guaranteed to work correctly with the AWS SDK.

[id="plugins-{type}s-{plugin}-record_max_size_bytes"]
===== `record_max_size_bytes`

* Value type is <<bytes,bytes>>
* There is `1_000_000`.

The maximum number of bytes for any record sent to Firehose.
Messages exceeding this size will be dropped.
See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html

[id="plugins-{type}s-{plugin}-record_total_max_size_bytes"]
===== `record_total_max_size_bytes`

* Value type is <<bytes,bytes>>
* There is `4_000_000`.

The maximum number of bytes for all records sent to Firehose.
See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html

[id="plugins-{type}s-{plugin}-proxy_uri"]
===== `proxy_uri`

* Value type is <<string,string>>
* There is no default value for this setting.

URI to proxy server if required

[id="plugins-{type}s-{plugin}-region"]
===== `region`

* Value type is <<string,string>>
* Default value is `"us-east-1"`

The AWS Region

[id="plugins-{type}s-{plugin}-role_arn"]
===== `role_arn`

* Value type is <<string,string>>
* There is no default value for this setting.

The AWS IAM Role to assume, if any.
This is used to generate temporary credentials, typically for cross-account access.
See the https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html[AssumeRole API documentation] for more information.

[id="plugins-{type}s-{plugin}-role_session_name"]
===== `role_session_name`

* Value type is <<string,string>>
* Default value is `"logstash"`

Session name to use when assuming an IAM role.

[id="plugins-{type}s-{plugin}-secret_access_key"]
===== `secret_access_key`

* Value type is <<string,string>>
* There is no default value for this setting.

The AWS Secret Access Key

[id="plugins-{type}s-{plugin}-session_token"]
===== `session_token`

* Value type is <<string,string>>
* There is no default value for this setting.

The AWS Session token for temporary credential

[id="plugins-{type}s-{plugin}-use_aws_bundled_ca"]
===== `use_aws_bundled_ca`

* Value type is <<boolean,boolean>>
* Default value is `false`

Use bundled CA certificates that ship with AWS SDK to verify SSL peer certificates.
For cases where the default certificates are unavailable, e.g. Windows, you can set this to `true`.

[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]

:default_codec!:
143 changes: 143 additions & 0 deletions lib/logstash/outputs/firehose.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
require "logstash/outputs/base"
require "logstash/namespace"
require 'logstash/plugin_mixins/aws_config'

require "aws-sdk-firehose"

# Push events to an Amazon Web Services (AWS) Data Firehose.
#
# Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations
# such as Amazon services or HTTP endpoints owned by supported third-party service providers.
# See : https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html
#
# This plugin use the AWS SDK to send data to the Firehose stream.
# See https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html#writing-with-sdk
#
# Your identity must have the following permissions on the stream:
# * `firehose:PutRecordBatch`
#
# ==== Batch Publishing
# This output publishes messages to Firehose in batches in order to optimize event throughput and increase performance.
# This is done using the `PutRecordBatch` API.
# See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html
#
# When publishing messages to Firehose in batches, the following service limits must be respected :
# * Each PutRecordBatch request supports up to 500 records.
# * Each record in the request can be as large as 1,000 KB.
# * All records in the request can be as large as 4 MB.
#
# This plugin will dynamically adjust the size of the batch published to Firehose in
# order to ensure that the total payload size does not exceed the limits.
#
class LogStash::Outputs::Firehose < LogStash::Outputs::Base
include LogStash::PluginMixins::AwsConfig::V2

RECORDS_MAX_BATCH_COUNT = 500
RECORD_MAX_SIZE_BYTES = 1_000_000
RECORD_TOTAL_MAX_SIZE_BYTES = 4_000_000
REQUEST_RETRY_INTERVAL_SECONDS = 2

config_name "firehose"
concurrency :shared
default :codec, 'json'

# The name of the delivery stream.
# Note that this is just the name of the stream, not the URL or ARN.
config :delivery_stream_name, :validate => :string, :required => true

# The maximum number of records to be sent in each batch.
config :batch_max_count, :validate => :number, :default => RECORDS_MAX_BATCH_COUNT

# The maximum number of bytes for any record sent to Firehose.
# Messages exceeding this size will be dropped.
# See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html
config :record_max_size_bytes, :validate => :bytes, :default => RECORD_MAX_SIZE_BYTES

# The maximum number of bytes for all records sent to Firehose.
# See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html
config :record_total_max_size_bytes, :validate => :bytes, :default => RECORD_TOTAL_MAX_SIZE_BYTES

def register
if @batch_max_count > RECORDS_MAX_BATCH_COUNT
raise LogStash::ConfigurationError, "The maximum batch size is #{RECORDS_MAX_BATCH_COUNT} records"
elsif @batch_max_count < 1
raise LogStash::ConfigurationError, 'The batch size must be greater than 0'
end

if @record_max_size_bytes > RECORD_MAX_SIZE_BYTES
raise LogStash::ConfigurationError, "The maximum record size is #{RECORD_MAX_SIZE_BYTES}"
elsif @record_max_size_bytes < 1
raise LogStash::ConfigurationError, 'The record size must be greater than 0'
end

if @record_total_max_size_bytes > RECORD_TOTAL_MAX_SIZE_BYTES
raise LogStash::ConfigurationError, "The maximum message size is #{RECORD_TOTAL_MAX_SIZE_BYTES}"
elsif @record_total_max_size_bytes < 1
raise LogStash::ConfigurationError, 'The message size must be greater than 0'
end

@logger.info("New Firehose output", :delivery_stream_name => @delivery_stream_name,
:batch_max_count => @batch_max_count,
:record_max_size_bytes => @record_max_size_bytes,
:record_total_max_size_bytes => @record_total_max_size_bytes)
@firehose = Aws::Firehose::Client.new(aws_options_hash)
end

public def multi_receive_encoded(encoded_events)
return if encoded_events.empty?

@logger.debug("Multi receive encoded", :encoded_events => encoded_events)

records_bytes = 0
records = []

encoded_events.each do |_, encoded|

if encoded.bytesize > @record_max_size_bytes
@logger.warn('Record exceeds maximum length and will be dropped', :record => encoded, :size => encoded.bytesize)
next
end

if records.size >= @batch_max_count or (records_bytes + encoded.bytesize) > @record_total_max_size_bytes
put_record_batch(records)
records_bytes = 0
records = []
end

records_bytes += encoded.bytesize
records << { :data => encoded }
end

put_record_batch(records) unless records.empty?
end

def put_record_batch(records)
return if records.nil? or records.empty?

@logger.debug("Publishing records", :batch => records.size)

begin
put_response = @firehose.put_record_batch({
delivery_stream_name: @delivery_stream_name,
records: records
})
rescue => e
@logger.error("Encountered an unexpected error submitting a batch request, will retry",
message: e.message, exception: e.class, backtrace: e.backtrace)
Stud.stoppable_sleep(REQUEST_RETRY_INTERVAL_SECONDS)
retry
end

if put_response.failed_put_count == 0
@logger.debug("Published records successfully", :batch => records.size)
return
end

put_response.request_responses
.filter { |r| !r.error_code.nil? }
.each do |response|
@logger.warn('Record publish error, will be dropped', :response => response)
end unless put_response.request_responses.nil?

end
end
5 changes: 4 additions & 1 deletion logstash-integration-aws.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Gem::Specification.new do |s|
logstash-output-cloudwatch
logstash-output-s3
logstash-output-sns
logstash-output-sqs).join(",")
logstash-output-sqs
logstash-output-firehose).join(",")
}


Expand All @@ -45,12 +46,14 @@ Gem::Specification.new do |s|
s.add_runtime_dependency "aws-sdk-cloudwatch"
s.add_runtime_dependency "aws-sdk-cloudfront"
s.add_runtime_dependency "aws-sdk-resourcegroups"
s.add_runtime_dependency "aws-sdk-firehose"

s.add_development_dependency "logstash-codec-json_lines"
s.add_development_dependency "logstash-codec-multiline"
s.add_development_dependency "logstash-codec-json"
s.add_development_dependency "logstash-codec-line"
s.add_development_dependency "logstash-devutils"
s.add_development_dependency "logstash-input-generator"
s.add_development_dependency "logstash-mixin-ecs_compatibility_support"
s.add_development_dependency "timecop"
end
Loading