Skip to content
Merged
Show file tree
Hide file tree
Changes from 63 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
fe953e6
Add DSM and Karafka integration support
ericfirth Sep 26, 2025
063adef
Add periodic scheduler and comprehensive tests for DataStreams
ericfirth Sep 26, 2025
e3ad6e1
Add API consistency methods for DataStreams checkpoints
ericfirth Sep 26, 2025
7b0998d
Update integrations to use new API methods with idiomatic Ruby blocks
ericfirth Sep 26, 2025
bff020c
Remove redundant pathway context encoding in Kafka producer
ericfirth Sep 26, 2025
540733c
Replace noisy puts statements with proper debug logging
ericfirth Sep 26, 2025
fc17a47
Fix Kafka consumer DSM implementation and remove noisy logging
ericfirth Sep 26, 2025
6a8352c
Implement proper DSM in Kafka event handlers
ericfirth Sep 26, 2025
66b2df8
Restore proper DSM implementation in Kafka consumer instrumentation
ericfirth Sep 26, 2025
9965af0
Use prepend instead of include for better readability
ericfirth Oct 17, 2025
ca3881c
Remove comments and make changes for readability suggested by reviewer
ericfirth Oct 17, 2025
ac0b0eb
Update set_consume_checkpoint & set_produce_checkpoint to use keyword…
ericfirth Oct 17, 2025
b3feabd
Type, as best as we can, the methods
ericfirth Oct 17, 2025
7b1c1a6
Move data_streams so that its not namespaced by tracing
ericfirth Oct 17, 2025
29b8d8f
Add debug logs to DSM processor to trace periodic flush
ericfirth Oct 17, 2025
e52e571
Add more debug logs to trace checkpoint creation
ericfirth Oct 17, 2025
3d00638
Fix DSM agent host/port handling and add debug logs
ericfirth Oct 17, 2025
2b12c4d
Clean up debug logs from DSM processor
ericfirth Oct 17, 2025
d806b80
Remove many superfluous comments
ericfirth Oct 20, 2025
f3b715d
Remove excess comments and move full_pathway calculation to where it …
ericfirth Oct 20, 2025
b87dc9c
Add an bucket stats test
ericfirth Oct 20, 2025
b57bd32
Make public API more grokable and stop exposing via the core/configur…
ericfirth Oct 22, 2025
763c19e
A bunch of fixes to pass in constants to Data Streams at initializati…
ericfirth Oct 22, 2025
c3e5eb3
Use Time as an object rather than as a float representing seconds
ericfirth Oct 22, 2025
55351d8
Set checkpoint uses keyword args to better signal the arguments that …
ericfirth Oct 22, 2025
a405486
Privatize the encoding
ericfirth Oct 22, 2025
2109feb
Make it so Data Streams is safer so that when enabled is true but DDS…
ericfirth Oct 22, 2025
33ee888
Rescue any errors setting checkpoints sends
ericfirth Oct 22, 2025
0d5dc58
Add transport layer to be able to transport on UDP, etc
ericfirth Oct 22, 2025
4d93f01
Remove a change that doesn't need to be there
ericfirth Oct 22, 2025
b6d9f8e
Add a return instead of a raise when decoding
ericfirth Oct 22, 2025
e0c577c
Remove circular dependency
ericfirth Oct 22, 2025
d7b1dda
CI fixes
ericfirth Oct 22, 2025
d47e60b
Apply suggestions from code review
ericfirth Oct 23, 2025
1a8310c
Address DSM PR review comments
ericfirth Oct 24, 2025
70c6520
Add skip_if_data_streams_not_supported helper to prevent silent test …
ericfirth Oct 24, 2025
dd96c21
Fix Steep type checking errors
ericfirth Oct 24, 2025
647f453
Move require 'datadog' to top of kafka data_streams_spec.rb
ericfirth Oct 24, 2025
9b9cfb8
Skip Data Streams tests instead of raising when DDSketch unavailable
ericfirth Oct 24, 2025
0ee4cb1
Exclude Data Streams tests from :main rake task (Nix environments)
ericfirth Oct 24, 2025
10cc108
Exclude entire data_streams directory from :main rake task
ericfirth Oct 24, 2025
14d1b75
Add Data Streams tests to CORE_WITH_LIBDATADOG_API
ericfirth Oct 24, 2025
fa38e10
Remove contrib data_streams_spec from CORE_WITH_LIBDATADOG_API
ericfirth Oct 24, 2025
ca3f8db
Fix processor_spec to pass required interval and logger arguments
ericfirth Oct 24, 2025
a9171b2
Add require 'msgpack' and 'zlib' to transport/stats.rb
ericfirth Oct 24, 2025
4425a6a
Fix Kafka tests and clean up DataStreams Processor
ericfirth Oct 24, 2025
68130b3
Fix AgentSettings constant and steep type signature
ericfirth Oct 24, 2025
350a0dc
Fix processor spec mocks and HTTP transport stubbing
ericfirth Oct 24, 2025
2f79488
Update lib/datadog/tracing/contrib/karafka/patcher.rb
ericfirth Oct 24, 2025
e02ed61
Update Rakefile
ericfirth Oct 27, 2025
43e1f23
Change the requires such that there is no circular dependency but tes…
ericfirth Oct 27, 2025
9de9c05
Remove flake
ericfirth Oct 29, 2025
0e42715
Clean up Kafka DSM test mocks and improve skip helper usage
ericfirth Oct 29, 2025
d29df4c
Refactor Data Streams configuration to follow AppSec pattern
ericfirth Oct 30, 2025
b43d2f0
Change Data Streams tags parameter from Array to Hash
ericfirth Oct 30, 2025
90d7b3e
Make DataStreams.processor private
ericfirth Oct 30, 2025
d02e7d2
Add LEB128 encoding documentation to pathway_context
ericfirth Oct 30, 2025
3e209e1
Fix track_kafka_produce and track_kafka_commit to write to correct da…
ericfirth Oct 30, 2025
5342cd3
Standard fixes
ericfirth Oct 30, 2025
182ce81
Add buffering so that we don't block the main thread to do dsm work
ericfirth Oct 30, 2025
c8ff0cd
Remove unnecessary file
ericfirth Oct 30, 2025
d71238f
Remove track_kafka_commit as we dont use it
ericfirth Oct 30, 2025
0963f4d
CI Fixes due to removed methods and emptying buckets manually
ericfirth Oct 30, 2025
10c9527
Add debug logs for debugging
ericfirth Oct 31, 2025
11695d1
Fix ruby-kafka instrumentation bug
ericfirth Oct 31, 2025
a862b3b
Figure out a bug in the transport layer added with typing
ericfirth Oct 31, 2025
8cbec59
Fix DSM Kafka instrumentation bugs
ericfirth Nov 3, 2025
30b3bf9
Standard fixes
ericfirth Nov 3, 2025
1400140
Make track_kafka_* non-visible
ericfirth Nov 3, 2025
2ae6bf8
Document the initialize params for the dsm processor
ericfirth Nov 3, 2025
728b9c7
Remove a disabled from a test
ericfirth Nov 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ CORE_WITH_LIBDATADOG_API = [
'spec/datadog/core/process_discovery_spec.rb',
'spec/datadog/core/configuration/stable_config_spec.rb',
'spec/datadog/core/ddsketch_spec.rb',
'spec/datadog/data_streams/**/*_spec.rb',
].freeze

# Data Streams Monitoring (DSM) requires libdatadog_api for DDSketch
# Add new instrumentation libraries here as they gain DSM support
DSM_ENABLED_LIBRARIES = [
:kafka,
:karafka
].freeze

# rubocop:disable Metrics/BlockLength
Expand Down Expand Up @@ -82,8 +90,8 @@ namespace :spec do
desc '' # "Explicitly hiding from `rake -T`"
RSpec::Core::RakeTask.new(:main) do |t, args|
t.pattern = 'spec/**/*_spec.rb'
t.exclude_pattern = 'spec/**/{appsec/integration,contrib,benchmark,redis,auto_instrument,opentelemetry,profiling,crashtracking,error_tracking,rubocop}/**/*_spec.rb,' \
' spec/**/{auto_instrument,opentelemetry,process_discovery,stable_config,ddsketch}_spec.rb,' \
t.exclude_pattern = 'spec/**/{appsec/integration,contrib,benchmark,redis,auto_instrument,opentelemetry,profiling,crashtracking,error_tracking,rubocop,data_streams}/**/*_spec.rb,' \
' spec/**/{auto_instrument,opentelemetry,process_discovery,stable_config,ddsketch}*_spec.rb,' \
' spec/datadog/gem_packaging_spec.rb'
t.rspec_opts = args.to_a.join(' ')
end
Expand Down Expand Up @@ -295,6 +303,22 @@ namespace :spec do
end
end

# Ensure DSM-enabled contrib tests compile libdatadog_api before running (MRI Ruby only)
# If compilation fails (e.g., new Ruby version without prebuilt extension), tests will skip via DDSketch.supported?
unless RUBY_PLATFORM == 'java'
task :compile_libdatadog_for_dsm do
Rake::Task["compile:libdatadog_api.#{RUBY_VERSION[/\d+.\d+/]}_#{RUBY_PLATFORM}"].invoke
rescue => e
# Compilation failed (likely unsupported Ruby version) - tests will skip gracefully
puts "Warning: libdatadog_api compilation failed: #{e.class}: #{e}"
puts "DSM tests will be skipped for this Ruby version"
end

DSM_ENABLED_LIBRARIES.each do |task_name|
Rake::Task["spec:#{task_name}"].enhance([:compile_libdatadog_for_dsm])
end
end

namespace :appsec do
task all: [
:main,
Expand Down
1 change: 1 addition & 0 deletions Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ target :datadog do
ignore 'lib/datadog/core/utils/time.rb'
ignore 'lib/datadog/core/vendor/multipart-post/multipart/post/multipartable.rb'
ignore 'lib/datadog/core/worker.rb'
ignore 'lib/datadog/data_streams/configuration/settings.rb'
ignore 'lib/datadog/core/workers/async.rb'
ignore 'lib/datadog/core/workers/interval_loop.rb'
ignore 'lib/datadog/core/workers/polling.rb'
Expand Down
1 change: 1 addition & 0 deletions lib/datadog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require_relative 'datadog/profiling'
require_relative 'datadog/appsec'
require_relative 'datadog/di'
require_relative 'datadog/data_streams'

# Line probes will not work on Ruby < 2.6 because of lack of :script_compiled
# trace point. Activate DI automatically on supported Ruby versions but
Expand Down
22 changes: 21 additions & 1 deletion lib/datadog/core/configuration/components.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
require_relative '../crashtracking/component'
require_relative '../environment/agent_info'
require_relative '../process_discovery'
require_relative '../../data_streams/processor'

module Datadog
module Core
Expand Down Expand Up @@ -75,6 +76,20 @@ def build_crashtracker(settings, agent_settings, logger:)

Datadog::Core::Crashtracking::Component.build(settings, agent_settings, logger: logger)
end

def build_data_streams(settings, agent_settings, logger)
return unless settings.data_streams.enabled

Datadog::DataStreams::Processor.new(
interval: settings.data_streams.interval,
logger: logger,
settings: settings,
agent_settings: agent_settings
)
rescue => e
logger.warn("Failed to initialize Data Streams Monitoring: #{e.class}: #{e}")
nil
end
end

attr_reader \
Expand All @@ -90,7 +105,8 @@ def build_crashtracker(settings, agent_settings, logger:)
:error_tracking,
:dynamic_instrumentation,
:appsec,
:agent_info
:agent_info,
:data_streams

def initialize(settings)
@settings = settings
Expand Down Expand Up @@ -126,6 +142,7 @@ def initialize(settings)
@appsec = Datadog::AppSec::Component.build_appsec_component(settings, telemetry: telemetry)
@dynamic_instrumentation = Datadog::DI::Component.build(settings, agent_settings, @logger, telemetry: telemetry)
@error_tracking = Datadog::ErrorTracking::Component.build(settings, @tracer, @logger)
@data_streams = self.class.build_data_streams(settings, agent_settings, @logger)
@environment_logger_extra[:dynamic_instrumentation_enabled] = !!@dynamic_instrumentation

# Configure non-privileged components.
Expand Down Expand Up @@ -195,6 +212,9 @@ def shutdown!(replacement = nil)
# Shutdown workers
runtime_metrics.stop(true, close_metrics: false)

# Shutdown Data Streams Monitoring processor
data_streams&.stop(true)

# Shutdown the old metrics, unless they are still being used.
# (e.g. custom Statsd instances.)
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ module Configuration
"DD_APPSEC_WAF_DEBUG" => {version: ["A"]},
"DD_APPSEC_WAF_TIMEOUT" => {version: ["A"]},
"DD_CRASHTRACKING_ENABLED" => {version: ["A"]},
"DD_DATA_STREAMS_ENABLED" => {version: ["A"]},
"DD_DBM_PROPAGATION_MODE" => {version: ["A"]},
"DD_DISABLE_DATADOG_RAILS" => {version: ["A"]},
"DD_DYNAMIC_INSTRUMENTATION_ENABLED" => {version: ["A"]},
Expand Down
2 changes: 0 additions & 2 deletions lib/datadog/core/ddsketch.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require 'datadog/core'

module Datadog
module Core
# Used to access ddsketch APIs.
Expand Down
100 changes: 100 additions & 0 deletions lib/datadog/data_streams.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# frozen_string_literal: true

require_relative 'data_streams/processor'
require_relative 'data_streams/pathway_context'
require_relative 'data_streams/configuration/settings'
require_relative 'data_streams/extensions'
require_relative 'core/utils/time'

module Datadog
# Datadog Data Streams Monitoring public API.
#
# The Datadog team ensures that public methods in this module
# only receive backwards compatible changes, and breaking changes
# will only occur in new major versions releases.
# @public_api
module DataStreams
class << self
# Set a produce checkpoint for Data Streams Monitoring
#
# @param type [String] The type of the checkpoint (e.g., 'kafka', 'kinesis', 'sns')
# @param destination [String] The destination (e.g., topic, exchange, stream name)
# @param auto_instrumentation [Boolean] Whether this checkpoint was set by auto-instrumentation (default: false)
# @param tags [Hash] Additional tags to include
# @yield [key, value] Block to inject context into carrier
# @return [String, nil] Base64 encoded pathway context or nil if disabled
# @public_api
def set_produce_checkpoint(type:, destination:, auto_instrumentation: false, tags: {}, &block)
processor&.set_produce_checkpoint(
type: type,
destination: destination,
manual_checkpoint: !auto_instrumentation,
tags: tags,
&block
)
end

# Set a consume checkpoint for Data Streams Monitoring
#
# @param type [String] The type of the checkpoint (e.g., 'kafka', 'kinesis', 'sns')
# @param source [String] The source (e.g., topic, exchange, stream name)
# @param auto_instrumentation [Boolean] Whether this checkpoint was set by auto-instrumentation (default: false)
# @param tags [Hash] Additional tags to include
# @yield [key] Block to extract context from carrier
# @return [String, nil] Base64 encoded pathway context or nil if disabled
# @public_api
def set_consume_checkpoint(type:, source:, auto_instrumentation: false, tags: {}, &block)
processor&.set_consume_checkpoint(
type: type,
source: source,
manual_checkpoint: !auto_instrumentation,
tags: tags,
&block
)
end

# Track Kafka produce offset for lag monitoring
#
# @param topic [String] The Kafka topic name
# @param partition [Integer] The partition number
# @param offset [Integer] The offset of the produced message
# @return [Boolean, nil] true if tracking succeeded, nil if disabled
# @public_api
def track_kafka_produce(topic, partition, offset)
processor&.track_kafka_produce(topic, partition, offset, Core::Utils::Time.now)
end

# Track Kafka message consumption for consumer lag monitoring
#
# @param topic [String] The Kafka topic name
# @param partition [Integer] The partition number
# @param offset [Integer] The offset of the consumed message
# @return [Boolean, nil] true if tracking succeeded, nil if disabled
# @public_api
def track_kafka_consume(topic, partition, offset)
processor&.track_kafka_consume(topic, partition, offset, Core::Utils::Time.now)
end

# Check if Data Streams Monitoring is enabled and available
#
# @return [Boolean] true if the processor is available
# @public_api
def enabled?
!processor.nil?
end

private

def processor
components.data_streams
end

def components
Datadog.send(:components)
end
end

# Expose Data Streams to global shared objects
Extensions.activate!
end
end
11 changes: 11 additions & 0 deletions lib/datadog/data_streams/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

require_relative 'configuration/settings'

module Datadog
module DataStreams
# Configuration for Data Streams Monitoring
module Configuration
end
end
end
49 changes: 49 additions & 0 deletions lib/datadog/data_streams/configuration/settings.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# frozen_string_literal: true

require_relative '../../core/environment/variable_helpers'
require_relative '../ext'

module Datadog
module DataStreams
module Configuration
# Configuration settings for Data Streams Monitoring.
module Settings
def self.extended(base)
base = base.singleton_class unless base.is_a?(Class)
add_settings!(base)
end

def self.add_settings!(base)
base.class_eval do
# Data Streams Monitoring configuration
# @public_api
settings :data_streams do
# Whether Data Streams Monitoring is enabled. When enabled, the library will
# collect and report data lineage information for messaging systems.
#
# @default `DD_DATA_STREAMS_ENABLED` environment variable, otherwise `false`.
# @return [Boolean]
option :enabled do |o|
o.type :bool
o.env Ext::ENV_ENABLED
o.default false
end

# The interval (in seconds) at which Data Streams Monitoring stats are flushed.
#
# @default 10.0
# @env '_DD_TRACE_STATS_WRITER_INTERVAL'
# @return [Float]
# @!visibility private
option :interval do |o|
o.type :float
o.env '_DD_TRACE_STATS_WRITER_INTERVAL'
o.default 10.0
end
end
end
end
end
end
end
end
11 changes: 11 additions & 0 deletions lib/datadog/data_streams/ext.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

module Datadog
module DataStreams
# Constants for Data Streams Monitoring configuration
# @public_api
module Ext
ENV_ENABLED = 'DD_DATA_STREAMS_ENABLED'
end
end
end
16 changes: 16 additions & 0 deletions lib/datadog/data_streams/extensions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

require_relative 'configuration'
require_relative '../core/configuration'

module Datadog
module DataStreams
# Extends Datadog with Data Streams Monitoring features
module Extensions
# Inject Data Streams settings into global configuration.
def self.activate!
Core::Configuration::Settings.extend(Configuration::Settings)
end
end
end
end
Loading
Loading