Skip to content

Commit 9c1a1cd

Browse files
committed
DI: chunk snapshot payloads
1 parent ae94edc commit 9c1a1cd

File tree

4 files changed

+131
-4
lines changed

4 files changed

+131
-4
lines changed

lib/datadog/di/transport/http.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,13 @@ def input(
4040
api_version: nil,
4141
headers: nil
4242
)
43-
Core::Transport::HTTP.build(api_instance_class: Input::API::Instance,
43+
Core::Transport::HTTP.build(
44+
api_instance_class: Input::API::Instance,
4445
logger: logger,
45-
agent_settings: agent_settings, api_version: api_version, headers: headers) do |transport|
46+
agent_settings: agent_settings,
47+
api_version: api_version,
48+
headers: headers,
49+
) do |transport|
4650
apis = API.defaults
4751

4852
transport.api API::INPUT, apis[API::INPUT]

lib/datadog/di/transport/input.rb

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ def initialize(parcel, serialized_tags)
2424
class Transport
2525
attr_reader :client, :apis, :default_api, :current_api_id, :logger
2626

27+
# The maximum chunk size that intake permits is 10 MB.
28+
#
29+
# Two bytes are for the [ and ] of JSON array syntax.
30+
MAX_CHUNK_SIZE = 10 * 1024 * 1024 - 2
31+
32+
# Try to send smaller payloads to avoid large network requests.
33+
# If a payload is larger than default chunk size but is under the
34+
# max chunk size, it will still get sent out.
35+
DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024
36+
2737
def initialize(apis, default_api, logger:)
2838
@apis = apis
2939
@logger = logger
@@ -36,9 +46,39 @@ def current_api
3646
end
3747

3848
def send_input(payload, tags)
39-
json = JSON.dump(payload)
40-
parcel = EncodedParcel.new(json)
49+
# Tags are the same for all chunks, serialize them one time.
4150
serialized_tags = Core::TagBuilder.serialize_tags(tags)
51+
52+
encoder = Core::Encoding::JSONEncoder
53+
encoded_snapshots = payload.map do |snapshot|
54+
encoder.encode(snapshot)
55+
end
56+
57+
Datadog::Core::Chunker.chunk_by_size(
58+
encoded_snapshots, DEFAULT_CHUNK_SIZE,
59+
).each do |chunk|
60+
if chunk.length == 1 && chunk.first.length > MAX_CHUNK_SIZE
61+
# Drop the chunk.
62+
# TODO report via telemetry metric?
63+
logger.debug { "di: dropping too big snapshot" }
64+
else
65+
chunked_payload = encoder.join(chunk)
66+
67+
# We need to rescue exceptions for each chunk so that
68+
# subsequent chunks are attempted to be sent.
69+
begin
70+
send_input_chunk(chunked_payload, serialized_tags)
71+
rescue => exc
72+
logger.debug { "di: failed to send snapshot chunk: #{exc.class}: #{exc} (at #{exc.backtrace.first})" }
73+
end
74+
end
75+
end
76+
77+
payload
78+
end
79+
80+
def send_input_chunk(chunked_payload, serialized_tags)
81+
parcel = EncodedParcel.new(chunked_payload)
4282
request = Request.new(parcel, serialized_tags)
4383

4484
response = @client.send_input_payload(request)

sig/datadog/di/transport/input.rbs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,16 @@ module Datadog
3030
attr_reader current_api_id: untyped
3131

3232
attr_reader logger: untyped
33+
MAX_CHUNK_SIZE: untyped
34+
DEFAULT_CHUNK_SIZE: untyped
3335

3436
def initialize: (untyped apis, untyped default_api, logger: untyped) -> void
3537

3638
def current_api: () -> untyped
3739

3840
def send_input: (untyped payload, untyped tags) -> untyped
41+
42+
def send_input_chunk: (untyped chunked_payload, untyped serialized_tags) -> untyped
3943
end
4044
end
4145
end
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
require "datadog/di/spec_helper"
2+
require 'datadog/di/transport/http'
3+
4+
RSpec.describe Datadog::DI::Transport::Input::Transport do
5+
di_test
6+
7+
let(:transport) do
8+
Datadog::DI::Transport::HTTP.input(agent_settings: agent_settings, logger: logger)
9+
end
10+
11+
let(:agent_settings) { Datadog::Core::Configuration::AgentSettingsResolver.call(settings, logger: nil) }
12+
13+
let(:settings) do
14+
Datadog::Core::Configuration::Settings.new
15+
end
16+
17+
let(:logger) do
18+
instance_double(Logger)
19+
end
20+
21+
let(:tags) { [] }
22+
23+
context 'when the combined size of snapshots serialized exceeds intake max' do
24+
before do
25+
# Reduce limits to make the test run faster and not require a lot of memory
26+
stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 1_000_000)
27+
stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 2_000_000)
28+
end
29+
30+
let(:snapshot) do
31+
# It doesn't matter what the payload is, generate a fake one here.
32+
# This payload serializes to 9781 bytes of JSON.
33+
1000.times.map do |i|
34+
[i, i]
35+
end.to_h
36+
end
37+
38+
let(:snapshots) do
39+
# This serializes to 9782001 bytes of JSON - just under 10 MB.
40+
[snapshot] * 1_000
41+
end
42+
43+
it 'chunks snapshots' do
44+
# Just under 10 MB payload, default chunk size 1 MB, we expect 10 chunks
45+
expect(transport).to receive(:send_input_chunk).exactly(10).times do |chunked_payload, serialized_tags|
46+
expect(chunked_payload.length).to be < 1_000_000
47+
expect(chunked_payload.length).to be > 800_000
48+
end
49+
transport.send_input(snapshots, tags)
50+
end
51+
52+
context 'when individual snapshot exceeds intake max' do
53+
before do
54+
# Reduce limits even more to force a reasonably-sized snapshot to be dropped
55+
stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 1_000)
56+
stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 2_000)
57+
end
58+
59+
let(:small_snapshot) do
60+
20.times.map do |i|
61+
[i, i]
62+
end.to_h
63+
end
64+
65+
let(:snapshots) do
66+
[small_snapshot, snapshot]
67+
end
68+
69+
it 'drops snapshot that is too big' do
70+
expect(transport).to receive(:send_input_chunk).once do |chunked_payload, serialized_tags|
71+
expect(chunked_payload.length).to be < 1_000
72+
expect(chunked_payload.length).to be > 100
73+
end
74+
expect_lazy_log(logger, :debug, 'di: dropping too big snapshot')
75+
transport.send_input(snapshots, tags)
76+
end
77+
end
78+
end
79+
end

0 commit comments

Comments
 (0)