Skip to content

Commit 11b9ae1

Browse files
authored
Merge pull request #4154 from DataDog/dedup-logs
Telemetry: Deduplicate log entries
2 parents 61ca49f + fcde76a commit 11b9ae1

File tree

10 files changed

+402
-34
lines changed

10 files changed

+402
-34
lines changed

lib/datadog/core/telemetry/event.rb

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,22 @@ def type
2929
def payload
3030
{}
3131
end
32+
33+
# Override equality to allow for deduplication
34+
# The basic implementation is to check if the other object is an instance of the same class.
35+
# This works for events that have no attributes.
36+
# For events with attributes, you should override this method to compare the attributes.
37+
def ==(other)
38+
other.is_a?(self.class)
39+
end
40+
41+
# @see #==
42+
alias eql? ==
43+
44+
# @see #==
45+
def hash
46+
self.class.hash
47+
end
3248
end
3349

3450
# Telemetry class for the 'app-started' event
@@ -263,6 +279,8 @@ def patch_error(integration)
263279

264280
# Telemetry class for the 'app-client-configuration-change' event
265281
class AppClientConfigurationChange < Base
282+
attr_reader :changes, :origin
283+
266284
def type
267285
'app-client-configuration-change'
268286
end
@@ -301,6 +319,16 @@ def configuration
301319

302320
res
303321
end
322+
323+
def ==(other)
324+
other.is_a?(AppClientConfigurationChange) && other.changes == @changes && other.origin == @origin
325+
end
326+
327+
alias eql? ==
328+
329+
def hash
330+
[self.class, @changes, @origin].hash
331+
end
304332
end
305333

306334
# Telemetry class for the 'app-heartbeat' event
@@ -319,6 +347,8 @@ def type
319347

320348
# Telemetry class for the 'generate-metrics' event
321349
class GenerateMetrics < Base
350+
attr_reader :namespace, :metric_series
351+
322352
def type
323353
'generate-metrics'
324354
end
@@ -335,24 +365,54 @@ def payload
335365
series: @metric_series.map(&:to_h)
336366
}
337367
end
368+
369+
def ==(other)
370+
other.is_a?(GenerateMetrics) && other.namespace == @namespace && other.metric_series == @metric_series
371+
end
372+
373+
alias eql? ==
374+
375+
def hash
376+
[self.class, @namespace, @metric_series].hash
377+
end
338378
end
339379

340-
# Telemetry class for the 'logs' event
380+
# Telemetry class for the 'logs' event.
381+
# Logs with the same content are deduplicated at flush time.
341382
class Log < Base
342383
LEVELS = {
343384
error: 'ERROR',
344385
warn: 'WARN',
345386
}.freeze
346387

388+
LEVELS_STRING = LEVELS.values.freeze
389+
390+
attr_reader :message, :level, :stack_trace, :count
391+
347392
def type
348393
'logs'
349394
end
350395

351-
def initialize(message:, level:, stack_trace: nil)
396+
# @param message [String] the log message
397+
# @param level [Symbol, String] the log level. Either :error, :warn, 'ERROR', or 'WARN'.
398+
# @param stack_trace [String, nil] the stack trace
399+
# @param count [Integer] the number of times the log was emitted. Used for deduplication.
400+
def initialize(message:, level:, stack_trace: nil, count: 1)
352401
super()
353402
@message = message
354403
@stack_trace = stack_trace
355-
@level = LEVELS.fetch(level) { |k| raise ArgumentError, "Invalid log level :#{k}" }
404+
405+
if level.is_a?(String) && LEVELS_STRING.include?(level)
406+
# String level is used during object copy for deduplication
407+
@level = level
408+
elsif level.is_a?(Symbol)
409+
# Symbol level is used by the regular log emitter user
410+
@level = LEVELS.fetch(level) { |k| raise ArgumentError, "Invalid log level :#{k}" }
411+
else
412+
raise ArgumentError, "Invalid log level #{level}"
413+
end
414+
415+
@count = count
356416
end
357417

358418
def payload
@@ -362,10 +422,24 @@ def payload
362422
message: @message,
363423
level: @level,
364424
stack_trace: @stack_trace,
425+
count: @count,
365426
}.compact
366427
]
367428
}
368429
end
430+
431+
# override equality to allow for deduplication
432+
def ==(other)
433+
other.is_a?(Log) &&
434+
other.message == @message &&
435+
other.level == @level && other.stack_trace == @stack_trace && other.count == @count
436+
end
437+
438+
alias eql? ==
439+
440+
def hash
441+
[self.class, @message, @level, @stack_trace, @count].hash
442+
end
369443
end
370444

371445
# Telemetry class for the 'distributions' event
@@ -395,6 +469,16 @@ def payload
395469
}
396470
end
397471
end
472+
473+
def ==(other)
474+
other.is_a?(MessageBatch) && other.events == @events
475+
end
476+
477+
alias eql? ==
478+
479+
def hash
480+
[self.class, @events].hash
481+
end
398482
end
399483
end
400484
end

lib/datadog/core/telemetry/metric.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,18 @@ def to_h
4141
}
4242
end
4343

44+
def ==(other)
45+
other.is_a?(self.class) &&
46+
name == other.name &&
47+
values == other.values && tags == other.tags && common == other.common && type == other.type
48+
end
49+
50+
alias eql? ==
51+
52+
def hash
53+
[self.class, name, values, tags, common, type].hash
54+
end
55+
4456
private
4557

4658
def tags_to_array(tags)
@@ -71,6 +83,16 @@ def to_h
7183
res[:interval] = interval
7284
res
7385
end
86+
87+
def ==(other)
88+
super && interval == other.interval
89+
end
90+
91+
alias eql? ==
92+
93+
def hash
94+
[super, interval].hash
95+
end
7496
end
7597

7698
# Count metric adds up all the submitted values in a time interval. This would be suitable for a

lib/datadog/core/telemetry/worker.rb

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ def flush_events(events)
9797
return if events.empty?
9898
return if !enabled? || !sent_started_event?
9999

100+
events = deduplicate_logs(events)
101+
100102
Datadog.logger.debug { "Sending #{events&.count} telemetry events" }
101103
send_event(Event::MessageBatch.new(events))
102104
end
@@ -167,6 +169,37 @@ def disable_on_not_found!(response)
167169
Datadog.logger.debug('Agent does not support telemetry; disabling future telemetry events.')
168170
disable!
169171
end
172+
173+
# Deduplicate logs by counting the number of repeated occurrences of the same log
174+
# entry and replacing them with a single entry with the calculated `count` value.
175+
# Non-log events are unchanged.
176+
def deduplicate_logs(events)
177+
return events if events.empty?
178+
179+
all_logs = []
180+
other_events = events.reject do |event|
181+
if event.is_a?(Event::Log)
182+
all_logs << event
183+
true
184+
else
185+
false
186+
end
187+
end
188+
189+
return events if all_logs.empty?
190+
191+
uniq_logs = all_logs.group_by(&:itself).map do |_, logs|
192+
log = logs.first
193+
if logs.size > 1
194+
# New log event with a count of repeated occurrences
195+
Event::Log.new(message: log.message, level: log.level, stack_trace: log.stack_trace, count: logs.size)
196+
else
197+
log
198+
end
199+
end
200+
201+
other_events + uniq_logs
202+
end
170203
end
171204
end
172205
end

sig/datadog/core/telemetry/component.rbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ module Datadog
2020

2121
def disable!: () -> void
2222

23-
def client_configuration_change!: (Enumerable[[String, Numeric | bool | String]] changes) -> void
23+
def client_configuration_change!: (Array[[String, Numeric | bool | String]] changes) -> void
2424

2525
def emit_closing!: () -> void
2626

sig/datadog/core/telemetry/event.rbs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ module Datadog
4646
end
4747

4848
class AppClientConfigurationChange < Base
49-
@changes: Enumerable[[String, Numeric | bool | String | int]]
50-
@origin: String
49+
attr_reader changes: Array[[String, Numeric | bool | String | int]]
50+
attr_reader origin: String
5151

52-
def initialize: (Enumerable[[String, Numeric | bool | String]] changes, String origin) -> void
52+
def initialize: (Array[[String, Numeric | bool | String]] changes, String origin) -> void
5353

5454
def configuration: () -> Array[Hash[Symbol, untyped]]
5555
end
@@ -61,22 +61,25 @@ module Datadog
6161
end
6262

6363
class GenerateMetrics < Base
64-
@namespace: String
65-
@metric_series: Enumerable[Datadog::Core::Telemetry::Metric::Base]
64+
attr_reader namespace: String
65+
attr_reader metric_series: Array[Metric::Base]
6666

67-
def initialize: (String namespace, Enumerable[Datadog::Core::Telemetry::Metric::Base] metric_series) -> void
67+
def initialize: (String namespace, Array[Metric::Base] metric_series) -> void
6868
end
6969

7070
class Log < Base
7171
LEVELS: Hash[Symbol, String]
7272

73-
@message: String
74-
@level: "ERROR" | "DEBUG" | "WARN"
75-
@stack_trace: String?
73+
LEVELS_STRING: Array[String]
7674

77-
def initialize: (message: String, level: Symbol, ?stack_trace: String?) -> void
75+
attr_reader count: Integer
76+
attr_reader message: String
77+
attr_reader level: String
78+
attr_reader stack_trace: String?
7879

79-
def payload: () -> { logs: [Hash[Symbol, String]] }
80+
def initialize: (message: String, level: (Symbol|String), ?stack_trace: String?, ?count: Integer) -> void
81+
82+
def payload: () -> { logs: [Hash[Symbol, (String|Integer)]] }
8083
end
8184

8285
class Distributions < GenerateMetrics

sig/datadog/core/telemetry/worker.rbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ module Datadog
3535

3636
private
3737

38+
def deduplicate_logs: (Array[Event::Base] events) -> Array[Event::Base]
39+
3840
def heartbeat!: () -> void
3941

4042
def started!: () -> void

0 commit comments

Comments
 (0)