Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rearrange msgpack decoder folder structure. #877

Merged
merged 2 commits into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_send(
mod tests {
use super::*;
use crate::error::ddog_trace_exporter_error_free;
use datadog_trace_utils::span_v04::Span;
use datadog_trace_utils::span::v04::Span;
use httpmock::prelude::*;
use httpmock::MockServer;
use std::{borrow::Borrow, mem::MaybeUninit};
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{

use criterion::{criterion_group, Criterion};
use data_pipeline::span_concentrator::SpanConcentrator;
use datadog_trace_utils::span_v04::Span;
use datadog_trace_utils::span::v04::Span;

fn get_bucket_start(now: SystemTime, n: u64) -> i64 {
let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n);
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! This includes the aggregation key to group spans together and the computation of stats from a
//! span.
use datadog_trace_protobuf::pb;
use datadog_trace_utils::span_v04::{trace_utils, Span};
use datadog_trace_utils::span::v04::{trace_utils, Span};
use std::borrow::Borrow;
use std::borrow::Cow;
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::time::{self, Duration, SystemTime};

use datadog_trace_protobuf::pb;
use datadog_trace_utils::span_v04::{trace_utils, Span};
use datadog_trace_utils::span::v04::{trace_utils, Span};

use aggregation::{AggregationKey, StatsBucket};

Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::*;
use datadog_trace_utils::span_v04::trace_utils::compute_top_level_span;
use datadog_trace_utils::span::v04::trace_utils::compute_top_level_span;
use rand::{thread_rng, Rng};

const BUCKET_SIZE: u64 = Duration::from_secs(2).as_nanos() as u64;
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/stats_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ pub fn stats_url_from_agent_url(agent_url: &str) -> anyhow::Result<hyper::Uri> {
#[cfg(test)]
mod tests {
use super::*;
use datadog_trace_utils::span_v04::{trace_utils, Span};
use datadog_trace_utils::span::v04::{trace_utils, Span};
use httpmock::prelude::*;
use httpmock::MockServer;
use time::Duration;
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/trace_exporter/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::telemetry::error::TelemetryError;
use crate::trace_exporter::msgpack_decoder::v04::error::DecodeError;
use crate::trace_exporter::msgpack_decoder::decode::error::DecodeError;
use hyper::http::StatusCode;
use hyper::Error as HyperError;
use serde_json::error::Error as SerdeError;
Expand Down
6 changes: 3 additions & 3 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use datadog_trace_utils::span_v04::{
use datadog_trace_utils::span::v04::{
trace_utils::{compute_top_level_span, has_top_level},
Span,
};
Expand Down Expand Up @@ -588,7 +588,7 @@ impl TraceExporter {

fn send_deser_ser(&self, data: tinybytes::Bytes) -> Result<String, TraceExporterError> {
// TODO base on input format
let (mut traces, size) = match msgpack_decoder::v04::decoder::from_slice(data) {
let (mut traces, size) = match msgpack_decoder::v04::from_slice(data) {
Ok(res) => res,
Err(err) => {
error!("Error deserializing trace from request body: {err}");
Expand Down Expand Up @@ -976,7 +976,7 @@ mod tests {
use self::error::AgentErrorKind;
use self::error::BuilderErrorKind;
use super::*;
use datadog_trace_utils::span_v04::Span;
use datadog_trace_utils::span::v04::Span;
use httpmock::prelude::*;
use httpmock::MockServer;
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion trace-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ pub mod trace_utils;
pub mod tracer_header_tags;
pub mod tracer_payload;

pub mod span_v04;
pub mod span;
88 changes: 88 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use rmp::{decode, decode::RmpRead, Marker};
use std::collections::HashMap;
use tinybytes::Bytes;

/// Reads a map from the buffer and returns it as a `HashMap`.
///
/// This function is generic over the key and value types of the map, and it uses a provided
/// function to read key-value pairs from the buffer.
///
/// # Arguments
///
/// * `len` - The number of key-value pairs to read from the buffer.
/// * `buf` - A reference to the Bytes containing the encoded map data.
/// * `read_pair` - A function that reads a key-value pair from the buffer and returns it as a
/// `Result<(K, V), DecodeError>`.
///
/// # Returns
///
/// * `Ok(HashMap<K, V>)` - A `HashMap` containing the decoded key-value pairs if successful.
/// * `Err(DecodeError)` - An error if the decoding process fails.
///
/// # Errors
///
/// This function will return an error if:
/// - The `read_pair` function returns an error while reading a key-value pair.
///
/// # Type Parameters
///
/// * `K` - The type of the keys in the map. Must implement `std::hash::Hash` and `Eq`.
/// * `V` - The type of the values in the map.
/// * `F` - The type of the function used to read key-value pairs from the buffer.
#[inline]
pub fn read_map<K, V, F>(
len: usize,
buf: &mut Bytes,
read_pair: F,
) -> Result<HashMap<K, V>, DecodeError>
where
K: std::hash::Hash + Eq,
F: Fn(&mut Bytes) -> Result<(K, V), DecodeError>,
{
let mut map = HashMap::with_capacity(len);
for _ in 0..len {
let (k, v) = read_pair(buf)?;
map.insert(k, v);
}
Ok(map)
}

/// Reads map length from the buffer
///
/// # Arguments
///
/// * `buf` - A reference to the Bytes containing the encoded map data.
///
/// # Returns
///
/// * `Ok(usize)` - Map length.
/// * `Err(DecodeError)` - An error if the decoding process fails.
///
/// # Errors
///
/// This function will return an error if:
/// - The buffer does not contain a map.
/// - There is an error reading from the buffer.
#[inline]
pub fn read_map_len(buf: &mut &[u8]) -> Result<usize, DecodeError> {
match decode::read_marker(buf)
.map_err(|_| DecodeError::InvalidFormat("Unable to read marker for map".to_owned()))?
{
Marker::FixMap(len) => Ok(len as usize),
Marker::Map16 => buf
.read_data_u16()
.map_err(|_| DecodeError::IOError)
.map(|len| len as usize),
Marker::Map32 => buf
.read_data_u32()
.map_err(|_| DecodeError::IOError)
.map(|len| len as usize),
_ => Err(DecodeError::InvalidType(
"Unable to read map from buffer".to_owned(),
)),
}
}
35 changes: 35 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/meta_struct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::map::{read_map, read_map_len};
use crate::msgpack_decoder::decode::number::read_number_bytes;
use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes};
use rmp::decode;
use std::collections::HashMap;
use tinybytes::{Bytes, BytesString};

#[inline]
pub fn read_meta_struct(buf: &mut Bytes) -> Result<HashMap<BytesString, Vec<u8>>, DecodeError> {
if let Some(empty_map) = handle_null_marker(buf, HashMap::default) {
return Ok(empty_map);
}

fn read_meta_struct_pair(buf: &mut Bytes) -> Result<(BytesString, Vec<u8>), DecodeError> {
let key = read_string_bytes(buf)?;
let array_len = decode::read_array_len(unsafe { buf.as_mut_slice() }).map_err(|_| {
DecodeError::InvalidFormat("Unable to read array len for meta_struct".to_owned())
})?;

let mut v = Vec::with_capacity(array_len as usize);

for _ in 0..array_len {
let value = read_number_bytes(buf)?;
v.push(value);
}
Ok((key, v))
}

let len = read_map_len(unsafe { buf.as_mut_slice() })?;
read_map(len, buf, read_meta_struct_pair)
}
27 changes: 27 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::map::{read_map, read_map_len};
use crate::msgpack_decoder::decode::number::read_number_bytes;
use crate::msgpack_decoder::decode::string::{handle_null_marker, read_string_bytes};
use std::collections::HashMap;
use tinybytes::{Bytes, BytesString};

#[inline]
pub fn read_metric_pair(buf: &mut Bytes) -> Result<(BytesString, f64), DecodeError> {
let key = read_string_bytes(buf)?;
let v = read_number_bytes(buf)?;

Ok((key, v))
}
#[inline]
pub fn read_metrics(buf: &mut Bytes) -> Result<HashMap<BytesString, f64>, DecodeError> {
if let Some(empty_map) = handle_null_marker(buf, HashMap::default) {
return Ok(empty_map);
}

let len = read_map_len(unsafe { buf.as_mut_slice() })?;

read_map(len, buf, read_metric_pair)
}
10 changes: 10 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

pub mod error;
pub mod map;
pub mod meta_struct;
pub mod metrics;
pub mod number;
pub mod span_link;
pub mod string;
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use super::error::DecodeError;
use crate::msgpack_decoder::decode::error::DecodeError;
use rmp::{decode::RmpRead, Marker};
use std::fmt;
use tinybytes::Bytes;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::v04::decoder::{
use crate::msgpack_decoder::decode::error::DecodeError;
use crate::msgpack_decoder::decode::number::read_number_bytes;
use crate::msgpack_decoder::decode::string::{
handle_null_marker, read_str_map_to_bytes_strings, read_string_bytes, read_string_ref,
};
use crate::msgpack_decoder::v04::error::DecodeError;
use crate::msgpack_decoder::v04::number::read_number_bytes;
use crate::span_v04::SpanLink;
use crate::span::v04::SpanLink;
use rmp::Marker;
use std::str::FromStr;
use tinybytes::Bytes;
Expand Down Expand Up @@ -98,7 +98,7 @@ fn decode_span_link(buf: &mut Bytes) -> Result<SpanLink, DecodeError> {
#[cfg(test)]
mod tests {
use super::SpanLinkKey;
use crate::msgpack_decoder::v04::error::DecodeError;
use crate::msgpack_decoder::decode::error::DecodeError;
use std::str::FromStr;

#[test]
Expand Down
97 changes: 97 additions & 0 deletions trace-utils/src/msgpack_decoder/decode/string.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use crate::msgpack_decoder::decode::error::DecodeError;
use rmp::decode;
use rmp::decode::DecodeStringError;
use std::collections::HashMap;
use tinybytes::{Bytes, BytesString};

// https://docs.rs/rmp/latest/rmp/enum.Marker.html#variant.Null (0xc0 == 192)
const NULL_MARKER: &u8 = &0xc0;

#[inline]
pub fn read_string_ref_nomut(buf: &[u8]) -> Result<(&str, &[u8]), DecodeError> {
decode::read_str_from_slice(buf).map_err(|e| match e {
DecodeStringError::InvalidMarkerRead(e) => DecodeError::InvalidFormat(e.to_string()),
DecodeStringError::InvalidDataRead(e) => DecodeError::InvalidConversion(e.to_string()),
DecodeStringError::TypeMismatch(marker) => {
DecodeError::InvalidType(format!("Type mismatch at marker {:?}", marker))
}
DecodeStringError::InvalidUtf8(_, e) => DecodeError::Utf8Error(e.to_string()),
_ => DecodeError::IOError,
})
}

#[inline]
pub fn read_string_ref<'a>(buf: &mut &'a [u8]) -> Result<&'a str, DecodeError> {
read_string_ref_nomut(buf).map(|(str, newbuf)| {
*buf = newbuf;
str
})
}

#[inline]
pub fn read_string_bytes(buf: &mut Bytes) -> Result<BytesString, DecodeError> {
// Note: we need to pass a &'static lifetime here, otherwise it'll complain
read_string_ref_nomut(unsafe { buf.as_mut_slice() }).map(|(str, newbuf)| {
let string = BytesString::from_bytes_slice(buf, str);
*unsafe { buf.as_mut_slice() } = newbuf;
string
})
}

#[inline]
pub fn read_nullable_string_bytes(buf: &mut Bytes) -> Result<BytesString, DecodeError> {
if let Some(empty_string) = handle_null_marker(buf, BytesString::default) {
Ok(empty_string)
} else {
read_string_bytes(buf)
}
}

#[inline]
// Safety: read_string_ref checks utf8 validity, so we don't do it again when creating the
// BytesStrings.
pub fn read_str_map_to_bytes_strings(
buf: &mut Bytes,
) -> Result<HashMap<BytesString, BytesString>, DecodeError> {
let len = decode::read_map_len(unsafe { buf.as_mut_slice() })
.map_err(|_| DecodeError::InvalidFormat("Unable to get map len for str map".to_owned()))?;

let mut map = HashMap::with_capacity(len.try_into().expect("Unable to cast map len to usize"));
for _ in 0..len {
let key = read_string_bytes(buf)?;
let value = read_string_bytes(buf)?;
map.insert(key, value);
}
Ok(map)
}

#[inline]
pub fn read_nullable_str_map_to_bytes_strings(
buf: &mut Bytes,
) -> Result<HashMap<BytesString, BytesString>, DecodeError> {
if let Some(empty_map) = handle_null_marker(buf, HashMap::default) {
return Ok(empty_map);
}

read_str_map_to_bytes_strings(buf)
}

/// When you want to "peek" if the next value is a null marker, and only advance the buffer if it is
/// null and return the default value. If it is not null, you can continue to decode as expected.
#[inline]
pub fn handle_null_marker<T, F>(buf: &mut Bytes, default: F) -> Option<T>
where
F: FnOnce() -> T,
{
let slice = unsafe { buf.as_mut_slice() };

if slice.first() == Some(NULL_MARKER) {
*slice = &slice[1..];
Some(default())
} else {
None
}
}
1 change: 1 addition & 0 deletions trace-utils/src/msgpack_decoder/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

pub mod decode;
pub mod v04;
Loading
Loading