Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions lib/datadog/di/transport/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ def input(
api_version: nil,
headers: nil
)
Core::Transport::HTTP.build(api_instance_class: Input::API::Instance,
Core::Transport::HTTP.build(
api_instance_class: Input::API::Instance,
logger: logger,
agent_settings: agent_settings, api_version: api_version, headers: headers) do |transport|
agent_settings: agent_settings,
api_version: api_version,
headers: headers,
) do |transport|
apis = API.defaults

transport.api API::INPUT, apis[API::INPUT]
Expand Down
49 changes: 47 additions & 2 deletions lib/datadog/di/transport/input.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# frozen_string_literal: true

require_relative '../../core/chunker'
require_relative '../../core/encoding'
require_relative '../../core/tag_builder'
require_relative '../../core/transport/parcel'
require_relative '../../core/transport/request'
require_relative '../error'
require_relative 'http/client'

module Datadog
Expand All @@ -24,6 +29,16 @@ def initialize(parcel, serialized_tags)
class Transport
attr_reader :client, :apis, :default_api, :current_api_id, :logger

# The maximum chunk size that intake permits is 10 MB.
#
# Two bytes are for the [ and ] of JSON array syntax.
MAX_CHUNK_SIZE = 10 * 1024 * 1024 - 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add the size resolution here via MAX_CHUNK_SIZE_BYTES?


# Try to send smaller payloads to avoid large network requests.
# If a payload is larger than default chunk size but is under the
# max chunk size, it will still get sent out.
DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024

def initialize(apis, default_api, logger:)
@apis = apis
@logger = logger
Expand All @@ -36,9 +51,39 @@ def current_api
end

def send_input(payload, tags)
json = JSON.dump(payload)
parcel = EncodedParcel.new(json)
# Tags are the same for all chunks, serialize them one time.
serialized_tags = Core::TagBuilder.serialize_tags(tags)

encoder = Core::Encoding::JSONEncoder
encoded_snapshots = payload.map do |snapshot|
encoder.encode(snapshot)
end

Datadog::Core::Chunker.chunk_by_size(
encoded_snapshots, DEFAULT_CHUNK_SIZE,
).each do |chunk|
if chunk.length == 1 && chunk.first.length > MAX_CHUNK_SIZE
# Drop the chunk.
# TODO report via telemetry metric?
logger.debug { "di: dropping too big snapshot" }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, but could it be more than 1 chunk going beyond max size? Or what is this scenario we are special handling?

Copy link
Member Author

@p-datadog p-datadog Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior changed a bit after I adjusted the code for the correct limits.

There are two limits that are relevant: size of any one snapshot - 1 MB and the size of the batch - 5 MB.

Given these limits, the batch (with chunking) will never exceed its max size since it is always legal to send a batch of one snapshot and the snapshot is limited to 1 MB.

For individual snapshots, yes, multiple snapshots could exceed their size, and each one will be logged. The logging however is at debug level which means the customers won't see any of it (normally). If this logging becomes an issue in the future I can add some sort of throttling but for now I think it's not necessary to worry about too much log output.

else
chunked_payload = encoder.join(chunk)

# We need to rescue exceptions for each chunk so that
# subsequent chunks are attempted to be sent.
begin
send_input_chunk(chunked_payload, serialized_tags)
rescue => exc
logger.debug { "di: failed to send snapshot chunk: #{exc.class}: #{exc} (at #{exc.backtrace.first})" }
end
end
end

payload
end

def send_input_chunk(chunked_payload, serialized_tags)
parcel = EncodedParcel.new(chunked_payload)
request = Request.new(parcel, serialized_tags)

response = @client.send_input_payload(request)
Expand Down
4 changes: 4 additions & 0 deletions sig/datadog/di/transport/input.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ module Datadog
attr_reader current_api_id: untyped

attr_reader logger: untyped
MAX_CHUNK_SIZE: untyped
DEFAULT_CHUNK_SIZE: untyped

def initialize: (untyped apis, untyped default_api, logger: untyped) -> void

def current_api: () -> untyped

def send_input: (untyped payload, untyped tags) -> untyped

def send_input_chunk: (untyped chunked_payload, untyped serialized_tags) -> untyped
end
end
end
Expand Down
79 changes: 79 additions & 0 deletions spec/datadog/di/transport/input_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
require "datadog/di/spec_helper"
require 'datadog/di/transport/http'

RSpec.describe Datadog::DI::Transport::Input::Transport do
di_test

let(:transport) do
Datadog::DI::Transport::HTTP.input(agent_settings: agent_settings, logger: logger)
end

let(:agent_settings) { Datadog::Core::Configuration::AgentSettingsResolver.call(settings, logger: nil) }

let(:settings) do
Datadog::Core::Configuration::Settings.new
end

let(:logger) do
instance_double(Logger)
end

let(:tags) { [] }

context 'when the combined size of snapshots serialized exceeds intake max' do
before do
# Reduce limits to make the test run faster and not require a lot of memory
stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 1_000_000)
stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 2_000_000)
end

let(:snapshot) do
# It doesn't matter what the payload is, generate a fake one here.
# This payload serializes to 9781 bytes of JSON.
1000.times.map do |i|
[i, i]
end.to_h
end

let(:snapshots) do
# This serializes to 9782001 bytes of JSON - just under 10 MB.
[snapshot] * 1_000
end

it 'chunks snapshots' do
# Just under 10 MB payload, default chunk size 1 MB, we expect 10 chunks
expect(transport).to receive(:send_input_chunk).exactly(10).times do |chunked_payload, serialized_tags|
expect(chunked_payload.length).to be < 1_000_000
expect(chunked_payload.length).to be > 800_000
end
transport.send_input(snapshots, tags)
end

context 'when individual snapshot exceeds intake max' do
before do
# Reduce limits even more to force a reasonably-sized snapshot to be dropped
stub_const('Datadog::DI::Transport::Input::Transport::DEFAULT_CHUNK_SIZE', 1_000)
stub_const('Datadog::DI::Transport::Input::Transport::MAX_CHUNK_SIZE', 2_000)
end

let(:small_snapshot) do
20.times.map do |i|
[i, i]
end.to_h
end

let(:snapshots) do
[small_snapshot, snapshot]
end

it 'drops snapshot that is too big' do
expect(transport).to receive(:send_input_chunk).once do |chunked_payload, serialized_tags|
expect(chunked_payload.length).to be < 1_000
expect(chunked_payload.length).to be > 100
end
expect_lazy_log(logger, :debug, 'di: dropping too big snapshot')
transport.send_input(snapshots, tags)
end
end
end
end