Skip to content

Commit b0ab99a

Browse files
committed
Add firehose output plugin and spec
1 parent 2a96fe1 commit b0ab99a

File tree

3 files changed

+258
-1
lines changed

3 files changed

+258
-1
lines changed

lib/logstash/outputs/firehose.rb

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
require "logstash/outputs/base"
2+
require "logstash/namespace"
3+
require 'logstash/plugin_mixins/aws_config'
4+
5+
require "aws-sdk-firehose"
6+
7+
# Push events to an Amazon Web Services (AWS) Data Firehose.
8+
#
9+
# Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations
10+
# such as Amazon services or HTTP endpoints owned by supported third-party service providers.
11+
# See : https://docs.aws.amazon.com/firehose/latest/dev/what-is-this-service.html
12+
#
13+
# This plugin use the AWS SDK to send data to the Firehose stream.
14+
# See https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html#writing-with-sdk
15+
#
16+
# Your identity must have the following permissions on the stream:
17+
# * `firehose:PutRecordBatch`
18+
#
19+
# ==== Batch Publishing
20+
# This output publishes messages to Firehose in batches in order to optimize event throughput and increase performance.
21+
# This is done using the `PutRecordBatch` API.
22+
# See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html
23+
#
24+
# When publishing messages to Firehose in batches, the following service limits must be respected :
25+
# * Each PutRecordBatch request supports up to 500 records.
26+
# * Each record in the request can be as large as 1,000 KB.
27+
# * All records in the request can be as large as 4 MB.
28+
#
29+
# This plugin will dynamically adjust the size of the batch published to Firehose in
30+
# order to ensure that the total payload size does not exceed the limits.
31+
#
32+
class LogStash::Outputs::Firehose < LogStash::Outputs::Base
33+
include LogStash::PluginMixins::AwsConfig::V2
34+
35+
RECORDS_MAX_BATCH_COUNT = 500
36+
RECORD_MAX_SIZE_BYTES = 1_000_000
37+
RECORD_TOTAL_MAX_SIZE_BYTES = 4_000_000
38+
REQUEST_RETRY_INTERVAL_SECONDS = 2
39+
40+
config_name "firehose"
41+
concurrency :shared
42+
default :codec, 'json'
43+
44+
# The name of the delivery stream.
45+
# Note that this is just the name of the stream, not the URL or ARN.
46+
config :delivery_stream_name, :validate => :string, :required => true
47+
48+
# The maximum number of records to be sent in each batch.
49+
config :batch_max_count, :validate => :number, :default => RECORDS_MAX_BATCH_COUNT
50+
51+
# The maximum number of bytes for any record sent to Firehose.
52+
# Messages exceeding this size will be dropped.
53+
# See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html
54+
config :record_max_size_bytes, :validate => :bytes, :default => RECORD_MAX_SIZE_BYTES
55+
56+
# The maximum number of bytes for all records sent to Firehose.
57+
# See https://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html
58+
config :record_total_max_size_bytes, :validate => :bytes, :default => RECORD_TOTAL_MAX_SIZE_BYTES
59+
60+
def register
61+
if @batch_max_count > RECORDS_MAX_BATCH_COUNT
62+
raise LogStash::ConfigurationError, "The maximum batch size is #{RECORDS_MAX_BATCH_COUNT} records"
63+
elsif @batch_max_count < 1
64+
raise LogStash::ConfigurationError, 'The batch size must be greater than 0'
65+
end
66+
67+
if @record_max_size_bytes > RECORD_MAX_SIZE_BYTES
68+
raise LogStash::ConfigurationError, "The maximum record size is #{RECORD_MAX_SIZE_BYTES}"
69+
elsif @record_max_size_bytes < 1
70+
raise LogStash::ConfigurationError, 'The record size must be greater than 0'
71+
end
72+
73+
if @record_total_max_size_bytes > RECORD_TOTAL_MAX_SIZE_BYTES
74+
raise LogStash::ConfigurationError, "The maximum message size is #{RECORD_TOTAL_MAX_SIZE_BYTES}"
75+
elsif @record_total_max_size_bytes < 1
76+
raise LogStash::ConfigurationError, 'The message size must be greater than 0'
77+
end
78+
79+
@logger.info("New Firehose output", :delivery_stream_name => @delivery_stream_name,
80+
:batch_max_count => @batch_max_count,
81+
:record_max_size_bytes => @record_max_size_bytes,
82+
:record_total_max_size_bytes => @record_total_max_size_bytes)
83+
@firehose = Aws::Firehose::Client.new(aws_options_hash)
84+
end
85+
86+
public def multi_receive_encoded(encoded_events)
87+
return if encoded_events.empty?
88+
89+
@logger.debug("Multi receive encoded", :encoded_events => encoded_events)
90+
91+
records_bytes = 0
92+
records = []
93+
94+
encoded_events.each do |_, encoded|
95+
96+
if encoded.bytesize > @record_max_size_bytes
97+
@logger.warn('Record exceeds maximum length and will be dropped', :record => encoded, :size => encoded.bytesize)
98+
next
99+
end
100+
101+
if records.size >= @batch_max_count or (records_bytes + encoded.bytesize) > @record_total_max_size_bytes
102+
put_record_batch(records)
103+
records_bytes = 0
104+
records = []
105+
end
106+
107+
records_bytes += encoded.bytesize
108+
records << { :data => encoded }
109+
end
110+
111+
put_record_batch(records) unless records.empty?
112+
end
113+
114+
def put_record_batch(records)
115+
return if records.nil? or records.empty?
116+
117+
@logger.debug("Publishing records", :batch => records.size)
118+
119+
begin
120+
put_response = @firehose.put_record_batch({
121+
delivery_stream_name: @delivery_stream_name,
122+
records: records
123+
})
124+
rescue => e
125+
@logger.error("Encountered an unexpected error submitting a batch request, will retry",
126+
message: e.message, exception: e.class, backtrace: e.backtrace)
127+
Stud.stoppable_sleep(REQUEST_RETRY_INTERVAL_SECONDS)
128+
retry
129+
end
130+
131+
if put_response.failed_put_count == 0
132+
@logger.debug("Published records successfully", :batch => records.size)
133+
return
134+
end
135+
136+
put_response.request_responses
137+
.filter { |r| !r.error_code.nil? }
138+
.each do |response|
139+
@logger.warn('Record publish error, will be dropped', :response => response)
140+
end unless put_response.request_responses.nil?
141+
142+
end
143+
end

logstash-integration-aws.gemspec

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ Gem::Specification.new do |s|
2222
logstash-output-cloudwatch
2323
logstash-output-s3
2424
logstash-output-sns
25-
logstash-output-sqs).join(",")
25+
logstash-output-sqs
26+
logstash-output-firehose).join(",")
2627
}
2728

2829

@@ -45,12 +46,14 @@ Gem::Specification.new do |s|
4546
s.add_runtime_dependency "aws-sdk-cloudwatch"
4647
s.add_runtime_dependency "aws-sdk-cloudfront"
4748
s.add_runtime_dependency "aws-sdk-resourcegroups"
49+
s.add_runtime_dependency "aws-sdk-firehose"
4850

4951
s.add_development_dependency "logstash-codec-json_lines"
5052
s.add_development_dependency "logstash-codec-multiline"
5153
s.add_development_dependency "logstash-codec-json"
5254
s.add_development_dependency "logstash-codec-line"
5355
s.add_development_dependency "logstash-devutils"
5456
s.add_development_dependency "logstash-input-generator"
57+
s.add_development_dependency "logstash-mixin-ecs_compatibility_support"
5558
s.add_development_dependency "timecop"
5659
end

spec/outputs/firehose_spec.rb

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
require 'logstash/outputs/firehose'
2+
3+
describe LogStash::Outputs::Firehose do
4+
5+
let(:configuration) { { "delivery_stream_name" => "test" } }
6+
let(:output) { LogStash::Plugin.lookup("output", "firehose").new(configuration) }
7+
8+
describe "#register" do
9+
10+
context "when no delivery stream specified" do
11+
let(:configuration) { {} }
12+
it "the method fails with error" do
13+
expect { output.register }.to raise_error(LogStash::ConfigurationError)
14+
end
15+
end
16+
17+
context "when batch max count out of bounds" do
18+
[0, LogStash::Outputs::Firehose::RECORDS_MAX_BATCH_COUNT + 1].each do |batch_max_count|
19+
let(:configuration) { { "delivery_stream_name" => "test", "batch_max_count" => batch_max_count } }
20+
it "the method fails with error" do
21+
expect { output.register }.to raise_error(LogStash::ConfigurationError)
22+
end
23+
end
24+
end
25+
26+
context "when record max size out of bounds" do
27+
[0, LogStash::Outputs::Firehose::RECORD_MAX_SIZE_BYTES + 1].each do |record_max_size_bytes|
28+
let(:configuration) { { "delivery_stream_name" => "test", "record_max_size_bytes" => record_max_size_bytes } }
29+
it "the method fails with error" do
30+
expect { output.register }.to raise_error(LogStash::ConfigurationError)
31+
end
32+
end
33+
end
34+
35+
context "when record total max size out of bounds" do
36+
[0, LogStash::Outputs::Firehose::RECORD_TOTAL_MAX_SIZE_BYTES + 1].each do |record_total_max_size_bytes|
37+
let(:configuration) { { "delivery_stream_name" => "test", "record_total_max_size_bytes" => record_total_max_size_bytes } }
38+
it "the method fails with error" do
39+
expect { output.register }.to raise_error(LogStash::ConfigurationError)
40+
end
41+
end
42+
end
43+
end
44+
45+
describe "#multi_receive_encoded" do
46+
47+
context "when records empty" do
48+
it "does not push" do
49+
expect(output).not_to receive(:put_record_batch)
50+
output.multi_receive_encoded([])
51+
end
52+
end
53+
54+
context "when record too big" do
55+
it "does not put" do
56+
output.instance_variable_set(:@record_max_size_bytes, 1)
57+
expect(output).not_to receive(:put_record_batch)
58+
output.multi_receive_encoded([[nil, "{}"]])
59+
end
60+
end
61+
62+
context "when receive events" do
63+
64+
event1 = "{one}"
65+
event2 = "{two}"
66+
event3 = "{three}"
67+
68+
it "split batches by count" do
69+
output.instance_variable_set(:@batch_max_count, 2)
70+
expect(output).to receive(:put_record_batch).once.with([{ :data => event1 }, { :data => event2 }])
71+
expect(output).to receive(:put_record_batch).once.with([{ :data => event3 }])
72+
output.multi_receive_encoded([[nil, event1], [nil, event2], [nil, event3]])
73+
end
74+
75+
it "split batches by size" do
76+
output.instance_variable_set(:@record_total_max_size_bytes, event1.bytesize + event2.bytesize)
77+
expect(output).to receive(:put_record_batch).once.with([{ :data => event1 }, { :data => event2 }])
78+
expect(output).to receive(:put_record_batch).once.with([{ :data => event3 }])
79+
output.multi_receive_encoded([[nil, event1], [nil, event2], [nil, event3]])
80+
end
81+
end
82+
83+
end
84+
85+
describe "#put_record_batch" do
86+
87+
let(:firehose_double) { instance_double(Aws::Firehose::Client) }
88+
89+
before do
90+
allow(Aws::Firehose::Client).to receive(:new).and_return(firehose_double)
91+
output.register
92+
end
93+
94+
context "when records empty" do
95+
it "does not push" do
96+
expect(firehose_double).not_to receive(:put_record_batch)
97+
output.put_record_batch([])
98+
end
99+
end
100+
101+
context "when firehose throw exception" do
102+
it "retry" do
103+
expect(firehose_double).to receive(:put_record_batch).twice.and_invoke(
104+
proc { |_| raise RuntimeError.new('Put failed') },
105+
proc { |_| Aws::Firehose::Types::PutRecordBatchOutput.new }
106+
)
107+
output.put_record_batch(["test_record"])
108+
end
109+
end
110+
end
111+
end

0 commit comments

Comments
 (0)