Skip to content

Commit cb2c246

Browse files
committed
DI: chunk snapshot payloads
1 parent ae94edc commit cb2c246

File tree

3 files changed

+126
-4
lines changed

3 files changed

+126
-4
lines changed

lib/datadog/di/transport/http.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,13 @@ def input(
4040
api_version: nil,
4141
headers: nil
4242
)
43-
Core::Transport::HTTP.build(api_instance_class: Input::API::Instance,
43+
Core::Transport::HTTP.build(
44+
api_instance_class: Input::API::Instance,
4445
logger: logger,
45-
agent_settings: agent_settings, api_version: api_version, headers: headers) do |transport|
46+
agent_settings: agent_settings,
47+
api_version: api_version,
48+
headers: headers,
49+
) do |transport|
4650
apis = API.defaults
4751

4852
transport.api API::INPUT, apis[API::INPUT]

lib/datadog/di/transport/input.rb

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ def initialize(parcel, serialized_tags)
2424
class Transport
2525
attr_reader :client, :apis, :default_api, :current_api_id, :logger
2626

27+
# The maximum chunk size that intake permits is 10 MB.
28+
#
29+
# Two bytes are for the [ and ] of JSON array syntax.
30+
MAX_CHUNK_SIZE = 10 * 1024 * 1024 - 2
31+
32+
# Try to send smaller payloads to avoid large network requests.
33+
# If a payload is larger than default chunk size but is under the
34+
# max chunk size, it will still get sent out.
35+
DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024
36+
2737
def initialize(apis, default_api, logger:)
2838
@apis = apis
2939
@logger = logger
@@ -36,9 +46,39 @@ def current_api
3646
end
3747

3848
def send_input(payload, tags)
39-
json = JSON.dump(payload)
40-
parcel = EncodedParcel.new(json)
49+
# Tags are the same for all chunks, serialize them one time.
4150
serialized_tags = Core::TagBuilder.serialize_tags(tags)
51+
52+
encoder = Core::Encoding::JSONEncoder
53+
encoded_snapshots = payload.map do |snapshot|
54+
encoder.encode(snapshot)
55+
end
56+
57+
Datadog::Core::Chunker.chunk_by_size(
58+
encoded_snapshots, DEFAULT_CHUNK_SIZE,
59+
).each do |chunk|
60+
if chunk.length == 1 && chunk.first.length > MAX_CHUNK_SIZE
61+
# Drop the chunk.
62+
# TODO report via telemetry metric?
63+
logger.debug { "di: dropping too big snapshot" }
64+
else
65+
chunked_payload = encoder.join(chunk)
66+
67+
# We need to rescue exceptions for each chunk so that
68+
# subsequent chunks are attempted to be sent.
69+
begin
70+
send_input_chunk(chunked_payload, serialized_tags)
71+
rescue => exc
72+
logger.debug { "di: failed to send #{event_name} chunk: #{exc.class}: #{exc} (at #{exc.backtrace.first})" }
73+
end
74+
end
75+
end
76+
77+
payload
78+
end
79+
80+
def send_input_chunk(chunked_payload, serialized_tags)
81+
parcel = EncodedParcel.new(chunked_payload)
4282
request = Request.new(parcel, serialized_tags)
4383

4484
response = @client.send_input_payload(request)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
require "datadog/di/spec_helper"
2+
3+
RSpec.describe Datadog::DI::Transport::Input::Transport do
4+
di_test
5+
6+
let(:transport) do
7+
Datadog::DI::Transport::HTTP.input(agent_settings: agent_settings, logger: logger)
8+
end
9+
10+
let(:agent_settings) { Datadog::Core::Configuration::AgentSettingsResolver.call(settings, logger: nil) }
11+
12+
let(:settings) do
13+
Datadog::Core::Configuration::Settings.new
14+
end
15+
16+
let(:logger) do
17+
instance_double(Logger)
18+
end
19+
20+
let(:tags) { [] }
21+
22+
context 'when the combined size of snapshots serialized exceeds intake max' do
23+
before do
24+
# Reduce limits to make the test run faster and not require a lot of memory
25+
stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 1_000_000)
26+
stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 2_000_000)
27+
end
28+
29+
let(:snapshot) do
30+
# It doesn't matter what the payload is, generate a fake one here.
31+
# This payload serializes to 9781 bytes of JSON.
32+
1000.times.map do |i|
33+
[i, i]
34+
end.to_h
35+
end
36+
37+
let(:snapshots) do
38+
# This serializes to 9782001 bytes of JSON - just under 10 MB.
39+
[snapshot] * 1_000
40+
end
41+
42+
it 'chunks snapshots' do
43+
# Just under 10 MB payload, default chunk size 1 MB, we expect 10 chunks
44+
expect(transport).to receive(:send_input_chunk).exactly(10).times do |chunked_payload, serialized_tags|
45+
expect(chunked_payload.length).to be < 1_000_000
46+
expect(chunked_payload.length).to be > 800_000
47+
end
48+
transport.send_input(snapshots, tags)
49+
end
50+
51+
context 'when individual snapshot exceeds intake max' do
52+
before do
53+
# Reduce limits even more to force a reasonably-sized snapshot to be dropped
54+
stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 1_000)
55+
stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 2_000)
56+
end
57+
58+
let(:small_snapshot) do
59+
20.times.map do |i|
60+
[i, i]
61+
end.to_h
62+
end
63+
64+
let(:snapshots) do
65+
[small_snapshot, snapshot]
66+
end
67+
68+
it 'drops snapshot that is too big' do
69+
expect(transport).to receive(:send_input_chunk).once do |chunked_payload, serialized_tags|
70+
expect(chunked_payload.length).to be < 1_000
71+
expect(chunked_payload.length).to be > 100
72+
end
73+
expect_lazy_log(logger, :debug, 'di: dropping too big snapshot')
74+
transport.send_input(snapshots, tags)
75+
end
76+
end
77+
end
78+
end

0 commit comments

Comments
 (0)