Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 67 additions & 23 deletions libdd-data-pipeline/benches/trace_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,64 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use libdd_data_pipeline::trace_buffer::{Export, TraceBuffer, TraceBufferConfig, TraceChunk};
use libdd_data_pipeline::trace_exporter::{
agent_response::AgentResponse, error::TraceExporterError,
};
use libdd_shared_runtime::SharedRuntime;

type Span = [u8; 100];
use libdd_tinybytes::BytesString;
use libdd_trace_utils::span::v04::SpanBytes;

// Number of chunks each sender thread sends per benchmark iteration.
const CHUNKS_PER_SENDER: usize = 900;

fn bs(s: &'static str) -> BytesString {
BytesString::from_static(s)
}

fn make_span() -> SpanBytes {
SpanBytes {
service: bs("my-web-service"),
name: bs("http.request"),
resource: bs("GET /api/v1/users"),
r#type: bs("web"),
trace_id: 1_234_567_890_123_456_789_u128,
span_id: 987_654_321_u64,
parent_id: 0,
start: 1_700_000_000_000_000_000_i64,
duration: 5_000_000_i64,
error: 0,
meta: HashMap::from_iter([
(bs("env"), bs("prod")),
(bs("version"), bs("1.0.0")),
(bs("http.method"), bs("GET")),
(bs("http.url"), bs("/api/v1/users")),
(bs("peer.service"), bs("users-service")),
]),
metrics: HashMap::from_iter([
(bs("_sampling_priority_v1"), 1.0_f64),
(bs("_dd.agent_psr"), 1.0_f64),
]),
meta_struct: HashMap::new(),
span_links: vec![],
span_events: vec![],
}
}

// Simulates async IO by sleeping 2ms per export batch.
#[derive(Debug)]
struct SleepExport;

impl Export<Span> for SleepExport {
impl Export<SpanBytes> for SleepExport {
fn export_trace_chunks(
&mut self,
_trace_chunks: Vec<TraceChunk<Span>>,
_trace_chunks: Vec<TraceChunk<SpanBytes>>,
) -> Pin<
Box<
dyn std::future::Future<Output = Result<AgentResponse, TraceExporterError>> + Send + '_,
Expand All @@ -37,11 +71,11 @@ impl Export<Span> for SleepExport {
}
}

fn setup_buffer() -> (Arc<SharedRuntime>, Arc<TraceBuffer<Span>>) {
fn setup_buffer() -> (Arc<SharedRuntime>, Arc<TraceBuffer<SpanBytes>>) {
let rt = Arc::new(SharedRuntime::new().expect("SharedRuntime::new"));
let cfg = TraceBufferConfig::new()
.max_buffered_spans(1_000)
.span_flush_threshold(500)
.max_buffered_bytes(1_000_000)
.flush_threshold_bytes(100_000)
.max_flush_interval(Duration::from_secs(2));
let (buf, worker) = TraceBuffer::new(cfg, Box::new(|_| {}), Box::new(SleepExport));
rt.spawn_worker(worker, true).expect("spawn_worker");
Expand Down Expand Up @@ -69,22 +103,32 @@ fn bench_trace_buffer(c: &mut Criterion) {
group.bench_function(
BenchmarkId::new(format!("{}_senders", num_senders), delay_label),
|b| {
b.iter(|| {
std::thread::scope(|s| {
for _ in 0..num_senders {
let sender = sender.clone();
s.spawn(move || {
for _ in 0..CHUNKS_PER_SENDER {
// BatchFull errors are expected under high load.
let _ = sender.send_chunk(vec![[0u8; 100]]);
if let Some(d) = delay {
std::thread::sleep(d);
b.iter_batched(
|| {
Vec::from_iter(
(0..num_senders)
.map(|_| (0..CHUNKS_PER_SENDER).map(|_| vec![make_span()]))
.map(Vec::from_iter),
)
},
|input| {
std::thread::scope(|s| {
for sender_spans in input {
let sender = sender.clone();
s.spawn(move || {
for s in sender_spans {
// BatchFull errors are expected under high load.
let _ = sender.send_chunk(s);
if let Some(d) = delay {
std::thread::sleep(d);
}
}
}
});
}
});
});
});
}
});
},
BatchSize::PerIteration,
);
},
);

Expand Down
Loading
Loading