Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn handle_v02_traces(State(state): State<TracesState>, body: Bytes) ->
}
};

match state.merge_agent_payload(payload) {
match state.merge_agent_payload(payload, &body[..]) {
Ok(()) => {
info!("Processed trace payload.");
StatusCode::ACCEPTED
Expand Down
10 changes: 8 additions & 2 deletions bin/correctness/datadog-intake/src/app/traces/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ impl TracesState {
}

/// Merges the given agent payload into the current traces state.
pub fn merge_agent_payload(&self, payload: AgentPayload) -> Result<(), GenericError> {
let new_spans = Span::get_spans_from_agent_payload(&payload);
///
/// Both the classic `tracerPayloads` (field 5) and the idx `idxTracerPayloads` (field 11)
/// paths are decoded. `raw_body` must be the original protobuf-encoded bytes of the
/// `AgentPayload` and is used to decode field 11 directly, bypassing the incorrectly typed
/// generated field in the `AgentPayload` struct.
pub fn merge_agent_payload(&self, payload: AgentPayload, raw_body: &[u8]) -> Result<(), GenericError> {
let mut new_spans = Span::get_spans_from_agent_payload(&payload);
new_spans.extend(Span::get_spans_from_idx_bytes(&payload, raw_body));
Comment on lines +45 to +46
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these always mutually exclusive depending on payload version?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

structurally no, but functionally at the moment yes. The trace-agent will only ever emit one or the other in a single agent payload. I'd have to double check if trace intake can support receiving both at the same time though

let mut inner = self.inner.lock().unwrap();
inner.spans.extend(new_spans);

Expand Down
279 changes: 278 additions & 1 deletion bin/correctness/stele/src/traces/tracer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use datadog_protos::traces as proto;
use datadog_protos::traces::idx as idx_proto;
use ordered_float::OrderedFloat;
use protobuf::CodedInputStream;
use saluki_common::collections::FastHashMap;
use serde::{Deserialize, Serialize};
use stringtheory::MetaString;
Expand Down Expand Up @@ -86,6 +88,23 @@ impl From<&proto::TracerPayload> for TracerMetadata {
}
}

impl TracerMetadata {
fn from_idx_payload(payload: &idx_proto::TracerPayload) -> Self {
let strings = &payload.strings;
Self {
container_id: resolve_ref(strings, payload.containerIDRef),
language_name: resolve_ref(strings, payload.languageNameRef),
language_version: resolve_ref(strings, payload.languageVersionRef),
tracer_version: resolve_ref(strings, payload.tracerVersionRef),
runtime_id: resolve_ref(strings, payload.runtimeIDRef),
tags: string_attrs_from_idx(&payload.attributes, strings),
env: resolve_ref(strings, payload.envRef),
hostname: resolve_ref(strings, payload.hostnameRef),
app_version: resolve_ref(strings, payload.appVersionRef),
}
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
struct TraceChunkMetadata {
priority: i32,
Expand All @@ -105,6 +124,17 @@ impl From<&proto::TraceChunk> for TraceChunkMetadata {
}
}

impl TraceChunkMetadata {
fn from_idx_chunk(chunk: &idx_proto::TraceChunk, strings: &[String]) -> Self {
Self {
priority: chunk.priority,
origin: resolve_ref(strings, chunk.originRef),
tags: string_attrs_from_idx(&chunk.attributes, strings),
dropped_trace: chunk.droppedTrace,
}
}
}

/// A simplified span representation.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct Span {
Expand Down Expand Up @@ -144,7 +174,8 @@ impl Span {
self.meta.get(meta_key).map(|s| &**s)
}

/// Gets all spans from the given `AgentPayload`.
/// Gets all spans from the given `AgentPayload`, reading from the classic `tracerPayloads`
/// field (proto field 5).
pub fn get_spans_from_agent_payload(payload: &proto::AgentPayload) -> Vec<Self> {
let agent_metadata = AgentMetadata::from(payload);

Expand All @@ -170,6 +201,53 @@ impl Span {
spans
}

/// Gets all spans from the raw AgentPayload bytes by decoding the idx `idxTracerPayloads`
/// field (proto field 11).
///
/// The generated `AgentPayload` struct has the wrong Rust type for field 11 (it falls back
/// to the classic `TracerPayload` because the idx types were not in the same codegen
/// invocation), so this function reads field 11 directly from the raw wire bytes using
/// `CodedInputStream` and decodes each message as the correct `idx::TracerPayload` type.
pub fn get_spans_from_idx_bytes(payload: &proto::AgentPayload, body: &[u8]) -> Vec<Self> {
let agent_metadata = AgentMetadata::from(payload);
let mut spans = Vec::new();

// AgentPayload.idxTracerPayloads = field 11, wire type LEN (2) → tag = 90
const IDX_TRACER_PAYLOADS_TAG: u32 = (11 << 3) | 2;

let mut is = CodedInputStream::from_bytes(body);
while let Ok(Some(tag)) = is.read_raw_tag_or_eof() {
if tag == IDX_TRACER_PAYLOADS_TAG {
let idx_payload: idx_proto::TracerPayload = match is.read_message() {
Ok(p) => p,
Err(_) => continue,
};
let strings = &idx_payload.strings;
let tracer_metadata = TracerMetadata::from_idx_payload(&idx_payload);

for chunk in &idx_payload.chunks {
let trace_chunk_metadata = TraceChunkMetadata::from_idx_chunk(chunk, strings);
let trace_id = trace_id_low_from_bytes(&chunk.traceID);

for span in &chunk.spans {
spans.push(Self::from_idx_proto(
agent_metadata.clone(),
tracer_metadata.clone(),
trace_chunk_metadata.clone(),
span,
trace_id,
strings,
));
}
}
} else {
let _ = protobuf::rt::skip_field_for_tag(tag, &mut is);
}
}

spans
}

fn from_proto(
agent_metadata: AgentMetadata, tracer_metadata: TracerMetadata, trace_chunk_metadata: TraceChunkMetadata,
value: &proto::Span,
Expand Down Expand Up @@ -205,6 +283,48 @@ impl Span {
span_events,
}
}

fn from_idx_proto(
agent_metadata: AgentMetadata, tracer_metadata: TracerMetadata, trace_chunk_metadata: TraceChunkMetadata,
span: &idx_proto::Span, trace_id: u64, strings: &[String],
) -> Self {
let (meta, metrics, meta_struct) = split_idx_span_attributes(&span.attributes, strings);

let mut span_links = span
.links
.iter()
.map(|l| SpanLink::from_idx(l, strings))
.collect::<Vec<_>>();
span_links.sort_by_key(|link| (link.trace_id, link.trace_id_high, link.span_id));

let mut span_events = span
.events
.iter()
.map(|e| SpanEvent::from_idx(e, strings))
.collect::<Vec<_>>();
span_events.sort_by_key(|event| event.time_unix_nano);

Self {
agent_metadata,
tracer_metadata,
trace_chunk_metadata,
service: resolve_ref(strings, span.serviceRef),
name: resolve_ref(strings, span.nameRef),
resource: resolve_ref(strings, span.resourceRef),
trace_id,
span_id: span.spanID,
parent_id: span.parentID,
start: span.start as i64,
duration: span.duration as i64,
error: i32::from(span.error),
meta,
metrics,
type_: resolve_ref(strings, span.typeRef),
meta_struct,
span_links,
span_events,
}
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
Expand Down Expand Up @@ -234,6 +354,20 @@ impl From<&proto::SpanLink> for SpanLink {
}
}

impl SpanLink {
fn from_idx(link: &idx_proto::SpanLink, strings: &[String]) -> Self {
let (trace_id, trace_id_high) = trace_id_parts_from_bytes(&link.traceID);
Self {
trace_id,
trace_id_high,
span_id: link.spanID,
attributes: string_attrs_from_idx(&link.attributes, strings),
tracestate: resolve_ref(strings, link.tracestateRef),
flags: link.flags,
}
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
struct SpanEvent {
time_unix_nano: u64,
Expand All @@ -255,6 +389,24 @@ impl From<&proto::SpanEvent> for SpanEvent {
}
}

impl SpanEvent {
fn from_idx(event: &idx_proto::SpanEvent, strings: &[String]) -> Self {
Self {
// The idx proto renamed time_unix_nano to `time` (same semantics).
time_unix_nano: event.time,
name: resolve_ref(strings, event.nameRef),
attributes: event
.attributes
.iter()
.filter_map(|(k_ref, v)| {
let attr = idx_anyvalue_to_event_attr(v, strings)?;
Some((resolve_ref(strings, *k_ref), attr))
})
.collect(),
}
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
enum AttributeAnyValue {
String(MetaString),
Expand Down Expand Up @@ -304,3 +456,128 @@ impl From<&proto::AttributeArrayValue> for AttributeArrayValue {
}
}
}

// ---------------------------------------------------------------------------
// idx helpers
// ---------------------------------------------------------------------------

/// Split return type for [`split_idx_span_attributes`]: (meta, metrics, meta_struct).
type SpanAttributeSplit = (
FastHashMap<MetaString, MetaString>,
FastHashMap<MetaString, WrappedFloat>,
FastHashMap<MetaString, Vec<u8>>,
);

/// Resolves a string table reference to a `MetaString`.
fn resolve_ref(strings: &[String], r: u32) -> MetaString {
strings
.get(r as usize)
.map(|s| MetaString::from(s.as_str()))
.unwrap_or_default()
}

/// Extracts the low and high 64-bit halves from a 16-byte big-endian trace ID.
///
/// Returns `(trace_id_low, trace_id_high)`. Both are zero when the byte slice is not 16 bytes.
fn trace_id_parts_from_bytes(bytes: &[u8]) -> (u64, u64) {
if bytes.len() == 16 {
let high = u64::from_be_bytes(bytes[..8].try_into().unwrap_or([0u8; 8]));
let low = u64::from_be_bytes(bytes[8..].try_into().unwrap_or([0u8; 8]));
(low, high)
} else {
(0, 0)
}
}

/// Returns only the low 64 bits of a 16-byte big-endian trace ID.
fn trace_id_low_from_bytes(bytes: &[u8]) -> u64 {
trace_id_parts_from_bytes(bytes).0
}

/// Collects only the string-valued entries from an idx attribute map.
fn string_attrs_from_idx(
attrs: &std::collections::HashMap<u32, idx_proto::AnyValue>, strings: &[String],
) -> FastHashMap<MetaString, MetaString> {
attrs
.iter()
.filter_map(|(k_ref, v)| {
let val = match &v.value {
Some(idx_proto::any_value::Value::StringValueRef(r)) => resolve_ref(strings, *r),
_ => return None,
};
Some((resolve_ref(strings, *k_ref), val))
})
.collect()
}

/// Splits an idx span attribute map into the three stele maps: meta (string), metrics (float),
/// and meta_struct (bytes).
fn split_idx_span_attributes(
attrs: &std::collections::HashMap<u32, idx_proto::AnyValue>, strings: &[String],
) -> SpanAttributeSplit {
let mut meta = FastHashMap::default();
let mut metrics = FastHashMap::default();
let mut meta_struct = FastHashMap::default();

for (k_ref, v) in attrs {
let key = resolve_ref(strings, *k_ref);
match &v.value {
Some(idx_proto::any_value::Value::StringValueRef(r)) => {
meta.insert(key, resolve_ref(strings, *r));
}
Some(idx_proto::any_value::Value::DoubleValue(f)) => {
metrics.insert(key, WrappedFloat(OrderedFloat(*f)));
}
Some(idx_proto::any_value::Value::IntValue(i)) => {
metrics.insert(key, WrappedFloat(OrderedFloat(*i as f64)));
}
Some(idx_proto::any_value::Value::BoolValue(b)) => {
meta.insert(key, MetaString::from(if *b { "true" } else { "false" }));
}
Some(idx_proto::any_value::Value::BytesValue(b)) => {
meta_struct.insert(key, b.clone());
}
_ => {}
}
}
(meta, metrics, meta_struct)
}

/// Converts an idx `AnyValue` to a stele `AttributeAnyValue` for use in span events.
fn idx_anyvalue_to_event_attr(v: &idx_proto::AnyValue, strings: &[String]) -> Option<AttributeAnyValue> {
match &v.value {
Some(idx_proto::any_value::Value::StringValueRef(r)) => {
Some(AttributeAnyValue::String(resolve_ref(strings, *r)))
}
Some(idx_proto::any_value::Value::BoolValue(b)) => Some(AttributeAnyValue::Boolean(*b)),
Some(idx_proto::any_value::Value::IntValue(i)) => Some(AttributeAnyValue::Integer(*i)),
Some(idx_proto::any_value::Value::DoubleValue(f)) => {
Some(AttributeAnyValue::Double(WrappedFloat(OrderedFloat(*f))))
}
Some(idx_proto::any_value::Value::ArrayValue(arr)) => {
let values = arr
.values
.iter()
.filter_map(|inner| idx_anyvalue_to_array_attr(inner, strings))
.collect();
Some(AttributeAnyValue::Array(values))
}
_ => None,
}
}

/// Converts an idx `AnyValue` to a stele `AttributeArrayValue` for use inside array-typed
/// span event attributes.
fn idx_anyvalue_to_array_attr(v: &idx_proto::AnyValue, strings: &[String]) -> Option<AttributeArrayValue> {
match &v.value {
Some(idx_proto::any_value::Value::StringValueRef(r)) => {
Some(AttributeArrayValue::String(resolve_ref(strings, *r)))
}
Some(idx_proto::any_value::Value::BoolValue(b)) => Some(AttributeArrayValue::Boolean(*b)),
Some(idx_proto::any_value::Value::IntValue(i)) => Some(AttributeArrayValue::Integer(*i)),
Some(idx_proto::any_value::Value::DoubleValue(f)) => {
Some(AttributeArrayValue::Double(WrappedFloat(OrderedFloat(*f))))
}
_ => None,
}
}
14 changes: 14 additions & 0 deletions lib/protos/datadog/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,20 @@ fn main() {
.customize_callback(SerdeCapableStructs)
.run_from_script();

// Separate invocation for idx proto types to avoid filename collision with
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally a take-it-or-leave-it thing, but it feels very opaque as an outsider to see "idx proto types" instead of just "v1"... like "v1" is immediately grokable/intuitive, but "idx" less so. Like we talk about it as the v1 protocol internally, not the "idx protocol."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is kinda my bad. See my other comment on trace versioning. I've started to try and talk about the project as the "Efficient trace payload" aka ETP to avoid this "V1" naming collision that happens everywhere. I went with idx in the trace-agent code since the biggest difference is that strings are 'indexed'. But I'm fine with renaming this to whatever is clearest here :P

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ETP or v1 make the most sense since they give something more tangible / intuitive to use when discussing.

// `trace_protos/tracer_payload.rs` — both directories have a file named
// `tracer_payload.proto` so they cannot share an output directory.
protobuf_codegen::Codegen::new()
.protoc()
.includes(["proto/datadog-agent"])
.inputs([
"proto/datadog-agent/datadog/trace/idx/tracer_payload.proto",
"proto/datadog-agent/datadog/trace/idx/span.proto",
])
.cargo_out_dir("idx_trace_protos")
.customize(codegen_customize.clone())
.run_from_script();

protobuf_codegen::Codegen::new()
.protoc()
.includes(["proto/sketches-go"])
Expand Down
Loading
Loading