Skip to content

Commit 73a6044

Browse files
committed
Fix flaky data_streams bucket aggregation test
Fixes race condition in 'aggregates multiple checkpoints into DDSketch histograms' test. Root Cause: The test captured 'now = Time.now.to_f' at the start, but the checkpoints created immediately after used their own Core::Utils::Time.now internally. These timestamps could differ by milliseconds, causing checkpoints to be bucketed into a different time window than the test expected. When the test calculated the expected bucket: bucket_time_ns = now_ns - (now_ns % processor.bucket_size_ns) And looked it up: bucket = processor.buckets[bucket_time_ns] The bucket would be nil if the checkpoints used a slightly different timestamp and landed in an adjacent bucket. Solution: Use Core::Utils::Time.now_provider to inject a fixed timestamp (Time.utc(2000, 1, 1, 0, 0, 0)) for the entire test. This ensures all checkpoints use the exact same deterministic timestamp, eliminating the race condition completely. The test logic remains unchanged - we're just controlling the time source to make it deterministic rather than dependent on wall clock timing. Related to incident #46145 Seed that reproduced the flake: 61335
1 parent f045f9d commit 73a6044

File tree

1 file changed

+38
-31
lines changed

1 file changed

+38
-31
lines changed

spec/datadog/data_streams/processor_spec.rb

Lines changed: 38 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -186,47 +186,54 @@
186186

187187
describe 'internal bucket aggregation' do
188188
it 'aggregates multiple checkpoints into DDSketch histograms' do
189-
now = Time.now.to_f
189+
fixed_time = Time.utc(2000, 1, 1, 0, 0, 0)
190+
allow(Datadog::Tracing).to receive(:active_span).and_return(nil)
190191

191-
# Create multiple checkpoints with the same tags to aggregate
192-
processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false)
193-
processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false)
194-
processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false)
192+
begin
193+
Datadog::Core::Utils::Time.now_provider = -> { fixed_time }
195194

196-
# Flush the event buffer to process checkpoints
197-
processor.send(:process_events)
195+
# Create multiple checkpoints with the same tags to aggregate
196+
processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false)
197+
processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false)
198+
processor.set_produce_checkpoint(type: 'kafka', destination: 'topicA', manual_checkpoint: false)
198199

199-
# Access internal buckets to verify aggregation
200-
expect(processor.buckets).not_to be_empty
200+
# Flush the event buffer to process checkpoints
201+
processor.send(:process_events)
201202

202-
# Find the bucket for this time window
203-
now_ns = (now * 1e9).to_i
204-
bucket_time_ns = now_ns - (now_ns % processor.bucket_size_ns)
203+
# Access internal buckets to verify aggregation
204+
expect(processor.buckets).not_to be_empty
205205

206-
bucket = processor.buckets[bucket_time_ns]
207-
expect(bucket).not_to be_nil
206+
# Find the bucket for this time window
207+
now_ns = (fixed_time.to_f * 1e9).to_i
208+
bucket_time_ns = now_ns - (now_ns % processor.bucket_size_ns)
208209

209-
# Verify stats were aggregated for this pathway
210-
pathway_stats = bucket[:pathway_stats]
211-
expect(pathway_stats).not_to be_empty
210+
bucket = processor.buckets[bucket_time_ns]
211+
expect(bucket).not_to be_nil
212212

213-
# At least one aggregation key should exist
214-
aggr_key = pathway_stats.keys.first
215-
stats = pathway_stats[aggr_key]
213+
# Verify stats were aggregated for this pathway
214+
pathway_stats = bucket[:pathway_stats]
215+
expect(pathway_stats).not_to be_empty
216216

217-
# Verify DDSketch objects were populated
218-
expect(stats[:edge_latency]).to be_a(Datadog::Core::DDSketch)
219-
expect(stats[:full_pathway_latency]).to be_a(Datadog::Core::DDSketch)
217+
# At least one aggregation key should exist
218+
aggr_key = pathway_stats.keys.first
219+
stats = pathway_stats[aggr_key]
220220

221-
# Verify exactly 3 samples were recorded (matching Python test)
222-
expect(stats[:edge_latency].count).to eq(3)
223-
expect(stats[:full_pathway_latency].count).to eq(3)
221+
# Verify DDSketch objects were populated
222+
expect(stats[:edge_latency]).to be_a(Datadog::Core::DDSketch)
223+
expect(stats[:full_pathway_latency]).to be_a(Datadog::Core::DDSketch)
224224

225-
# Verify sketches can be encoded for serialization
226-
expect(stats[:edge_latency].encode).to be_a(String)
227-
expect(stats[:edge_latency].encode).not_to be_empty
228-
expect(stats[:full_pathway_latency].encode).to be_a(String)
229-
expect(stats[:full_pathway_latency].encode).not_to be_empty
225+
# Verify exactly 3 samples were recorded (matching Python test)
226+
expect(stats[:edge_latency].count).to eq(3)
227+
expect(stats[:full_pathway_latency].count).to eq(3)
228+
229+
# Verify sketches can be encoded for serialization
230+
expect(stats[:edge_latency].encode).to be_a(String)
231+
expect(stats[:edge_latency].encode).not_to be_empty
232+
expect(stats[:full_pathway_latency].encode).to be_a(String)
233+
expect(stats[:full_pathway_latency].encode).not_to be_empty
234+
ensure
235+
Datadog::Core::Utils::Time.now_provider = -> { ::Time.now }
236+
end
230237
end
231238
end
232239
end

0 commit comments

Comments
 (0)