diff --git a/data-pipeline-ffi/src/error.rs b/data-pipeline-ffi/src/error.rs index 3cf5b01b1..bfe7dea65 100644 --- a/data-pipeline-ffi/src/error.rs +++ b/data-pipeline-ffi/src/error.rs @@ -119,7 +119,7 @@ impl From for ExporterError { ExporterErrorCode::HttpUnknown } } - TraceExporterError::Serde(_) => ExporterErrorCode::Serde, + TraceExporterError::Serialization(_) => ExporterErrorCode::Serde, }; ExporterError::new(code, &value.to_string()) } diff --git a/data-pipeline/src/telemetry/mod.rs b/data-pipeline/src/telemetry/mod.rs index 9173c47cb..fda997527 100644 --- a/data-pipeline/src/telemetry/mod.rs +++ b/data-pipeline/src/telemetry/mod.rs @@ -6,12 +6,15 @@ pub mod error; pub mod metrics; use crate::telemetry::error::TelemetryError; use crate::telemetry::metrics::Metrics; -use datadog_trace_utils::trace_utils::SendDataResult; +use datadog_trace_utils::{ + send_with_retry::{SendWithRetryError, SendWithRetryResult}, + trace_utils::SendDataResult, +}; use ddcommon::tag::Tag; use ddtelemetry::worker::{ LifecycleAction, TelemetryActions, TelemetryWorkerBuilder, TelemetryWorkerHandle, }; -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; use tokio::task::JoinHandle; /// Structure to build a Telemetry client. @@ -108,13 +111,83 @@ pub struct TelemetryClient { handle: JoinHandle<()>, } +/// Telemetry describing the sending of a trace payload +/// It can be produced from a [`SendWithRetryResult`] or from a [`SendDataResult`]. +#[derive(PartialEq, Debug, Default)] +pub struct SendPayloadTelemetry { + requests_count: u64, + errors_network: u64, + errors_timeout: u64, + errors_status_code: u64, + bytes_sent: u64, + chunks_sent: u64, + chunks_dropped: u64, + responses_count_per_code: HashMap, +} + +impl From<&SendDataResult> for SendPayloadTelemetry { + fn from(value: &SendDataResult) -> Self { + Self { + requests_count: value.requests_count, + errors_network: value.errors_network, + errors_timeout: value.errors_timeout, + errors_status_code: value.errors_status_code, + bytes_sent: value.bytes_sent, + chunks_sent: value.chunks_sent, + chunks_dropped: value.chunks_dropped, + responses_count_per_code: value.responses_count_per_code.clone(), + } + } +} + +impl SendPayloadTelemetry { + /// Create a [`SendPayloadTelemetry`] from a [`SendWithRetryResult`]. + pub fn from_retry_result(value: &SendWithRetryResult, bytes_sent: u64, chunks: u64) -> Self { + let mut telemetry = Self::default(); + match value { + Ok((response, attempts)) => { + telemetry.chunks_sent = chunks; + telemetry.bytes_sent = bytes_sent; + telemetry + .responses_count_per_code + .insert(response.status().into(), 1); + telemetry.requests_count = *attempts as u64; + } + Err(err) => { + telemetry.chunks_dropped = chunks; + match err { + SendWithRetryError::Http(response, attempts) => { + telemetry.errors_status_code = 1; + telemetry + .responses_count_per_code + .insert(response.status().into(), 1); + telemetry.requests_count = *attempts as u64; + } + SendWithRetryError::Timeout(attempts) => { + telemetry.errors_timeout = 1; + telemetry.requests_count = *attempts as u64; + } + SendWithRetryError::Network(_, attempts) => { + telemetry.errors_network = 1; + telemetry.requests_count = *attempts as u64; + } + SendWithRetryError::Build(attempts) => { + telemetry.requests_count = *attempts as u64; + } + } + } + }; + telemetry + } +} + impl TelemetryClient { /// Sends metrics to the agent using a telemetry worker handle. /// /// # Arguments: /// /// * `telemetry_handle`: telemetry worker handle used to enqueue metrics. - pub fn send(&self, data: &SendDataResult) -> Result<(), TelemetryError> { + pub fn send(&self, data: &SendPayloadTelemetry) -> Result<(), TelemetryError> { if data.requests_count > 0 { let key = self.metrics.get(metrics::MetricKind::ApiRequest); self.worker @@ -188,11 +261,24 @@ impl TelemetryClient { mod tests { use httpmock::Method::POST; use httpmock::MockServer; + use hyper::{Response, StatusCode}; use regex::Regex; - use std::collections::HashMap; use super::*; + async fn get_test_client(url: &str) -> TelemetryClient { + TelemetryClientBuilder::default() + .set_service_name("test_service") + .set_language("test_language") + .set_language_version("test_language_version") + .set_tracer_version("test_tracer_version") + .set_url(url) + .set_hearbeat(100) + .build() + .await + .unwrap() + } + #[test] fn builder_test() { let builder = TelemetryClientBuilder::default() @@ -244,25 +330,12 @@ mod tests { }) .await; - let result = TelemetryClientBuilder::default() - .set_service_name("test_service") - .set_language("test_language") - .set_language_version("test_language_version") - .set_tracer_version("test_tracer_version") - .set_url(&server.url("/")) - .set_hearbeat(100) - .build() - .await; - - assert!(result.is_ok()); - - let data = SendDataResult { - last_result: Ok(hyper::Response::default()), + let data = SendPayloadTelemetry { bytes_sent: 1, ..Default::default() }; - let client = result.unwrap(); + let client = get_test_client(&server.url("/")).await; client.start().await; let _ = client.send(&data); @@ -283,25 +356,12 @@ mod tests { }) .await; - let result = TelemetryClientBuilder::default() - .set_service_name("test_service") - .set_language("test_language") - .set_language_version("test_language_version") - .set_tracer_version("test_tracer_version") - .set_url(&server.url("/")) - .set_hearbeat(100) - .build() - .await; - - assert!(result.is_ok()); - - let data = SendDataResult { - last_result: Ok(hyper::Response::default()), + let data = SendPayloadTelemetry { requests_count: 1, ..Default::default() }; - let client = result.unwrap(); + let client = get_test_client(&server.url("/")).await; client.start().await; let _ = client.send(&data); @@ -322,25 +382,12 @@ mod tests { }) .await; - let result = TelemetryClientBuilder::default() - .set_service_name("test_service") - .set_language("test_language") - .set_language_version("test_language_version") - .set_tracer_version("test_tracer_version") - .set_url(&server.url("/")) - .set_hearbeat(100) - .build() - .await; - - assert!(result.is_ok()); - - let data = SendDataResult { - last_result: Ok(hyper::Response::default()), + let data = SendPayloadTelemetry { responses_count_per_code: HashMap::from([(200, 1)]), ..Default::default() }; - let client = result.unwrap(); + let client = get_test_client(&server.url("/")).await; client.start().await; let _ = client.send(&data); @@ -361,25 +408,12 @@ mod tests { }) .await; - let result = TelemetryClientBuilder::default() - .set_service_name("test_service") - .set_language("test_language") - .set_language_version("test_language_version") - .set_tracer_version("test_tracer_version") - .set_url(&server.url("/")) - .set_hearbeat(100) - .build() - .await; - - assert!(result.is_ok()); - - let data = SendDataResult { - last_result: Ok(hyper::Response::default()), + let data = SendPayloadTelemetry { errors_timeout: 1, ..Default::default() }; - let client = result.unwrap(); + let client = get_test_client(&server.url("/")).await; client.start().await; let _ = client.send(&data); @@ -400,25 +434,12 @@ mod tests { }) .await; - let result = TelemetryClientBuilder::default() - .set_service_name("test_service") - .set_language("test_language") - .set_language_version("test_language_version") - .set_tracer_version("test_tracer_version") - .set_url(&server.url("/")) - .set_hearbeat(100) - .build() - .await; - - assert!(result.is_ok()); - - let data = SendDataResult { - last_result: Ok(hyper::Response::default()), + let data = SendPayloadTelemetry { errors_network: 1, ..Default::default() }; - let client = result.unwrap(); + let client = get_test_client(&server.url("/")).await; client.start().await; let _ = client.send(&data); @@ -439,25 +460,12 @@ mod tests { }) .await; - let result = TelemetryClientBuilder::default() - .set_service_name("test_service") - .set_language("test_language") - .set_language_version("test_language_version") - .set_tracer_version("test_tracer_version") - .set_url(&server.url("/")) - .set_hearbeat(100) - .build() - .await; - - assert!(result.is_ok()); - - let data = SendDataResult { - last_result: Ok(hyper::Response::default()), + let data = SendPayloadTelemetry { errors_status_code: 1, ..Default::default() }; - let client = result.unwrap(); + let client = get_test_client(&server.url("/")).await; client.start().await; let _ = client.send(&data); @@ -467,7 +475,7 @@ mod tests { #[cfg_attr(miri, ignore)] #[tokio::test] - async fn errors_chunks_sent_test() { + async fn chunks_sent_test() { let payload = Regex::new(r#""metric":"trace_chunk_sent","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog"\],"common":true,"type":"count"#).unwrap(); let server = MockServer::start_async().await; @@ -478,25 +486,12 @@ mod tests { }) .await; - let result = TelemetryClientBuilder::default() - .set_service_name("test_service") - .set_language("test_language") - .set_language_version("test_language_version") - .set_tracer_version("test_tracer_version") - .set_url(&server.url("/")) - .set_hearbeat(100) - .build() - .await; - - assert!(result.is_ok()); - - let data = SendDataResult { - last_result: Ok(hyper::Response::default()), + let data = SendPayloadTelemetry { chunks_sent: 1, ..Default::default() }; - let client = result.unwrap(); + let client = get_test_client(&server.url("/")).await; client.start().await; let _ = client.send(&data); @@ -506,7 +501,7 @@ mod tests { #[cfg_attr(miri, ignore)] #[tokio::test] - async fn errors_chunks_dropped_test() { + async fn chunks_dropped_test() { let payload = Regex::new(r#""metric":"trace_chunk_dropped","points":\[\[\d+,1\.0\]\],"tags":\["src_library:libdatadog"\],"common":true,"type":"count"#).unwrap(); let server = MockServer::start_async().await; @@ -517,25 +512,12 @@ mod tests { }) .await; - let result = TelemetryClientBuilder::default() - .set_service_name("test_service") - .set_language("test_language") - .set_language_version("test_language_version") - .set_tracer_version("test_tracer_version") - .set_url(&server.url("/")) - .set_hearbeat(100) - .build() - .await; - - assert!(result.is_ok()); - - let data = SendDataResult { - last_result: Ok(hyper::Response::default()), + let data = SendPayloadTelemetry { chunks_dropped: 1, ..Default::default() }; - let client = result.unwrap(); + let client = get_test_client(&server.url("/")).await; client.start().await; let _ = client.send(&data); @@ -543,6 +525,120 @@ mod tests { telemetry_srv.assert_hits_async(1).await; } + #[test] + fn telemetry_from_ok_response_test() { + let result = Ok((Response::default(), 3)); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 4, 5); + assert_eq!( + telemetry, + SendPayloadTelemetry { + bytes_sent: 4, + chunks_sent: 5, + requests_count: 3, + responses_count_per_code: HashMap::from([(200, 1)]), + ..Default::default() + } + ) + } + + #[test] + fn telemetry_from_request_error_test() { + let mut error_response = Response::default(); + *error_response.status_mut() = StatusCode::BAD_REQUEST; + let result = Err(SendWithRetryError::Http(error_response, 5)); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); + assert_eq!( + telemetry, + SendPayloadTelemetry { + chunks_dropped: 2, + requests_count: 5, + errors_status_code: 1, + responses_count_per_code: HashMap::from([(400, 1)]), + ..Default::default() + } + ) + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn telemetry_from_network_error_test() { + // Create an hyper error by calling an undefined service + let hyper_error = hyper::Client::new() + .get(hyper::Uri::from_static("localhost:12345")) + .await + .unwrap_err(); + + let result = Err(SendWithRetryError::Network(hyper_error, 5)); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); + assert_eq!( + telemetry, + SendPayloadTelemetry { + chunks_dropped: 2, + requests_count: 5, + errors_network: 1, + ..Default::default() + } + ) + } + + #[test] + fn telemetry_from_timeout_error_test() { + let result = Err(SendWithRetryError::Timeout(5)); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); + assert_eq!( + telemetry, + SendPayloadTelemetry { + chunks_dropped: 2, + requests_count: 5, + errors_timeout: 1, + ..Default::default() + } + ) + } + + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn telemetry_from_build_error_test() { + let result = Err(SendWithRetryError::Build(5)); + let telemetry = SendPayloadTelemetry::from_retry_result(&result, 1, 2); + assert_eq!( + telemetry, + SendPayloadTelemetry { + chunks_dropped: 2, + requests_count: 5, + ..Default::default() + } + ) + } + + #[test] + fn telemetry_from_send_data_result_test() { + let result = SendDataResult { + requests_count: 10, + responses_count_per_code: HashMap::from([(200, 3)]), + errors_timeout: 1, + errors_network: 2, + errors_status_code: 3, + bytes_sent: 4, + chunks_sent: 5, + chunks_dropped: 6, + ..Default::default() + }; + + let expected_telemetry = SendPayloadTelemetry { + requests_count: 10, + errors_network: 2, + errors_timeout: 1, + errors_status_code: 3, + bytes_sent: 4, + chunks_sent: 5, + chunks_dropped: 6, + responses_count_per_code: HashMap::from([(200, 3)]), + }; + + assert_eq!(SendPayloadTelemetry::from(&result), expected_telemetry) + } + #[cfg_attr(miri, ignore)] #[tokio::test] async fn runtime_id_test() { @@ -566,8 +662,6 @@ mod tests { .build() .await; - assert!(result.is_ok()); - let client = result.unwrap(); client.start().await; diff --git a/data-pipeline/src/trace_exporter/error.rs b/data-pipeline/src/trace_exporter/error.rs index 4bc30f0e3..d03f982af 100644 --- a/data-pipeline/src/trace_exporter/error.rs +++ b/data-pipeline/src/trace_exporter/error.rs @@ -5,7 +5,7 @@ use crate::telemetry::error::TelemetryError; use crate::trace_exporter::msgpack_decoder::decode::error::DecodeError; use hyper::http::StatusCode; use hyper::Error as HyperError; -use serde_json::error::Error as SerdeError; +use rmp_serde::encode::Error as EncodeError; use std::error::Error; use std::fmt::{Debug, Display}; @@ -111,18 +111,25 @@ impl RequestError { /// TraceExporterError holds different types of errors that occur when handling traces. #[derive(Debug)] pub enum TraceExporterError { + /// Error in agent response processing. Agent(AgentErrorKind), + /// Invalid builder input. Builder(BuilderErrorKind), + /// Error in deserialization of incoming trace payload. Deserialization(DecodeError), + /// Generic IO error. Io(std::io::Error), + /// Network related error (i.e. hyper error). Network(NetworkError), + /// Agent responded with an error code. Request(RequestError), - Serde(SerdeError), + /// Error in serialization of processed trace payload. + Serialization(EncodeError), } -impl From for TraceExporterError { - fn from(value: SerdeError) -> Self { - TraceExporterError::Serde(value) +impl From for TraceExporterError { + fn from(value: EncodeError) -> Self { + TraceExporterError::Serialization(value) } } @@ -188,7 +195,7 @@ impl Display for TraceExporterError { TraceExporterError::Io(e) => std::fmt::Display::fmt(e, f), TraceExporterError::Network(e) => std::fmt::Display::fmt(e, f), TraceExporterError::Request(e) => std::fmt::Display::fmt(e, f), - TraceExporterError::Serde(e) => std::fmt::Display::fmt(e, f), + TraceExporterError::Serialization(e) => std::fmt::Display::fmt(e, f), } } } diff --git a/data-pipeline/src/trace_exporter/mod.rs b/data-pipeline/src/trace_exporter/mod.rs index a378d4e27..fcdd7bb66 100644 --- a/data-pipeline/src/trace_exporter/mod.rs +++ b/data-pipeline/src/trace_exporter/mod.rs @@ -3,7 +3,7 @@ pub mod agent_response; pub mod error; use crate::agent_info::{AgentInfoArc, AgentInfoFetcher}; -use crate::telemetry::{TelemetryClient, TelemetryClientBuilder}; +use crate::telemetry::{SendPayloadTelemetry, TelemetryClient, TelemetryClientBuilder}; use crate::trace_exporter::error::{RequestError, TraceExporterError}; use crate::{ health_metrics, health_metrics::HealthMetric, span_concentrator::SpanConcentrator, @@ -11,21 +11,25 @@ use crate::{ }; use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; +use datadog_trace_utils::msgpack_decoder; +use datadog_trace_utils::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryError}; use datadog_trace_utils::span::v04::{ trace_utils::{compute_top_level_span, has_top_level}, Span, }; -use datadog_trace_utils::trace_utils::{self, SendData, TracerHeaderTags}; -use datadog_trace_utils::tracer_payload::TraceCollection; -use datadog_trace_utils::{msgpack_decoder, tracer_payload}; +use datadog_trace_utils::trace_utils::TracerHeaderTags; +use ddcommon::header::{ + APPLICATION_MSGPACK_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, DATADOG_TRACE_COUNT_STR, +}; use ddcommon::tag::Tag; use ddcommon::{connector, tag, Endpoint}; use dogstatsd_client::{new_flusher, Client, DogStatsDAction}; use either::Either; use hyper::body::HttpBody; use hyper::http::uri::PathAndQuery; -use hyper::{Body, Method, Uri}; +use hyper::{header::CONTENT_TYPE, Body, Method, Uri}; use log::{error, info}; +use std::io; use std::sync::{Arc, Mutex}; use std::time::Duration; use std::{borrow::Borrow, collections::HashMap, str::FromStr, time}; @@ -588,7 +592,7 @@ impl TraceExporter { fn send_deser_ser(&self, data: tinybytes::Bytes) -> Result { // TODO base on input format - let (mut traces, size) = match msgpack_decoder::v04::from_slice(data) { + let (mut traces, _) = match msgpack_decoder::v04::from_slice(data) { Ok(res) => res, Err(err) => { error!("Error deserializing trace from request body: {err}"); @@ -631,26 +635,38 @@ impl TraceExporter { match self.output_format { TraceExporterOutputFormat::V04 => { - let tracer_payload = trace_utils::collect_trace_chunks( - TraceCollection::V04(traces), - &header_tags, - &mut tracer_payload::DefaultTraceChunkProcessor, - self.endpoint.api_key.is_some(), - ); + let payload = rmp_serde::to_vec_named(&traces).map_err(TraceExporterError::from)?; + let chunks = traces.len(); + let payload_len = payload.len(); let endpoint = Endpoint { url: self.output_format.add_path(&self.endpoint.url), ..self.endpoint.clone() }; - let send_data = SendData::new(size, tracer_payload, header_tags, &endpoint); + let mut headers: HashMap<&str, String> = header_tags.into(); + headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string()); + headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string()); + headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string()); + + let strategy = RetryStrategy::default(); self.runtime.block_on(async { - let send_data_result = send_data.send().await; + // Send traces to the agent + let result = + send_with_retry(&endpoint, payload, &headers, &strategy, None).await; + + // Send telemetry for the payload sending if let Some(telemetry) = &self.telemetry { - if let Err(e) = telemetry.send(&send_data_result) { + if let Err(e) = telemetry.send(&SendPayloadTelemetry::from_retry_result( + &result, + payload_len as u64, + chunks as u64, + )) { error!("Error sending telemetry: {}", e.to_string()); } } - match send_data_result.last_result { - Ok(response) => { + + // Handle the result + match result { + Ok((response, _)) => { let status = response.status(); let body = match response.into_body().collect().await { Ok(body) => String::from_utf8_lossy(&body.to_bytes()).to_string(), @@ -692,9 +708,31 @@ impl TraceExporter { HealthMetric::Count(health_metrics::STAT_SEND_TRACES_ERRORS, 1), None, ); - Err(TraceExporterError::Io(std::io::Error::from( - std::io::ErrorKind::Other, - ))) + match err { + SendWithRetryError::Http(response, _) => { + let status = response.status(); + let body = match response.into_body().collect().await { + Ok(body) => body.to_bytes(), + Err(err) => { + error!("Error reading agent response body: {err}"); + return Err(TraceExporterError::from(err)); + } + }; + Err(TraceExporterError::Request(RequestError::new( + status, + &String::from_utf8_lossy(&body), + ))) + } + SendWithRetryError::Timeout(_) => Err(TraceExporterError::from( + io::Error::from(io::ErrorKind::TimedOut), + )), + SendWithRetryError::Network(err, _) => { + Err(TraceExporterError::from(err)) + } + SendWithRetryError::Build(_) => Err(TraceExporterError::from( + io::Error::from(io::ErrorKind::Other), + )), + } } } }) diff --git a/ddcommon/src/lib.rs b/ddcommon/src/lib.rs index 44707c018..b82d6bb7f 100644 --- a/ddcommon/src/lib.rs +++ b/ddcommon/src/lib.rs @@ -22,13 +22,28 @@ pub mod tag; pub mod header { #![allow(clippy::declare_interior_mutable_const)] use hyper::{header::HeaderName, http::HeaderValue}; + + // These strings are defined separately to be used in context where &str are used to represent + // headers (e.g. SendData) while keeping a single source of truth. + pub const DATADOG_SEND_REAL_HTTP_STATUS_STR: &str = "datadog-send-real-http-status"; + pub const DATADOG_TRACE_COUNT_STR: &str = "x-datadog-trace-count"; + pub const APPLICATION_MSGPACK_STR: &str = "application/msgpack"; + pub const APPLICATION_PROTOBUF_STR: &str = "application/x-protobuf"; + pub const DATADOG_CONTAINER_ID: HeaderName = HeaderName::from_static("datadog-container-id"); pub const DATADOG_ENTITY_ID: HeaderName = HeaderName::from_static("datadog-entity-id"); pub const DATADOG_EXTERNAL_ENV: HeaderName = HeaderName::from_static("datadog-external-env"); pub const DATADOG_TRACE_COUNT: HeaderName = HeaderName::from_static("x-datadog-trace-count"); + /// Signal to the agent to send 429 responses when a payload is dropped + /// If this is not set then the agent will always return a 200 regardless if the payload is + /// dropped. + pub const DATADOG_SEND_REAL_HTTP_STATUS: HeaderName = + HeaderName::from_static(DATADOG_SEND_REAL_HTTP_STATUS_STR); pub const DATADOG_API_KEY: HeaderName = HeaderName::from_static("dd-api-key"); pub const APPLICATION_JSON: HeaderValue = HeaderValue::from_static("application/json"); - pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static("application/msgpack"); + pub const APPLICATION_MSGPACK: HeaderValue = HeaderValue::from_static(APPLICATION_MSGPACK_STR); + pub const APPLICATION_PROTOBUF: HeaderValue = + HeaderValue::from_static(APPLICATION_PROTOBUF_STR); pub const X_DATADOG_TEST_SESSION_TOKEN: HeaderName = HeaderName::from_static("x-datadog-test-session-token"); } diff --git a/trace-utils/src/send_data/mod.rs b/trace-utils/src/send_data/mod.rs index 0159531ee..44022f2f2 100644 --- a/trace-utils/src/send_data/mod.rs +++ b/trace-utils/src/send_data/mod.rs @@ -4,25 +4,23 @@ pub mod send_data_result; use crate::send_with_retry::{send_with_retry, RetryStrategy, SendWithRetryResult}; -use crate::trace_utils::{SendDataResult, TracerHeaderTags}; +use crate::trace_utils::TracerHeaderTags; use crate::tracer_payload::TracerPayloadCollection; use anyhow::{anyhow, Context}; use datadog_trace_protobuf::pb::{AgentPayload, TracerPayload}; -use ddcommon::Endpoint; +use ddcommon::{ + header::{ + APPLICATION_MSGPACK_STR, APPLICATION_PROTOBUF_STR, DATADOG_SEND_REAL_HTTP_STATUS_STR, + DATADOG_TRACE_COUNT_STR, + }, + Endpoint, +}; use futures::stream::FuturesUnordered; use futures::StreamExt; use hyper::header::CONTENT_TYPE; +use send_data_result::SendDataResult; use std::collections::HashMap; -const HEADER_DD_TRACE_COUNT: &str = "X-Datadog-Trace-Count"; - -const HEADER_CTYPE_MSGPACK: &str = "application/msgpack"; -const HEADER_CTYPE_PROTOBUF: &str = "application/x-protobuf"; - -/// HEADER_REAL_HTTP_STATUS signals to the agent to send 429 responses when a payload is dropped -/// If this is not set then the agent will always return a 200 regardless if the payload is dropped. -const HEADER_REAL_HTTP_STATUS: &str = "Datadog-Send-Real-Http-Status"; - #[derive(Debug, Clone)] /// `SendData` is a structure that holds the data to be sent to a target endpoint. /// It includes the payloads to be sent, the size of the data, the target endpoint, @@ -88,7 +86,7 @@ impl SendData { target: &Endpoint, ) -> SendData { let mut headers: HashMap<&'static str, String> = tracer_header_tags.into(); - headers.insert(HEADER_REAL_HTTP_STATUS, "1".to_string()); + headers.insert(DATADOG_SEND_REAL_HTTP_STATUS_STR, "1".to_string()); SendData { tracer_payloads: tracer_payload, size, @@ -210,7 +208,7 @@ impl SendData { }; let mut request_headers = self.headers.clone(); - request_headers.insert(CONTENT_TYPE.as_str(), HEADER_CTYPE_PROTOBUF.to_string()); + request_headers.insert(CONTENT_TYPE.as_str(), APPLICATION_PROTOBUF_STR.to_string()); let (response, bytes_sent, chunks) = self .send_payload( @@ -221,7 +219,7 @@ impl SendData { ) .await; - result.update(response, bytes_sent, chunks).await; + result.update(response, bytes_sent, chunks); result } @@ -238,8 +236,8 @@ impl SendData { for tracer_payload in payloads { let chunks = u64::try_from(tracer_payload.chunks.len()).unwrap(); let mut headers = self.headers.clone(); - headers.insert(HEADER_DD_TRACE_COUNT, chunks.to_string()); - headers.insert(CONTENT_TYPE.as_str(), HEADER_CTYPE_MSGPACK.to_string()); + headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string()); + headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string()); let payload = match rmp_serde::to_vec_named(tracer_payload) { Ok(p) => p, @@ -252,8 +250,8 @@ impl SendData { TracerPayloadCollection::V04(payloads) => { let chunks = u64::try_from(self.tracer_payloads.size()).unwrap(); let mut headers = self.headers.clone(); - headers.insert(HEADER_DD_TRACE_COUNT, chunks.to_string()); - headers.insert(CONTENT_TYPE.as_str(), HEADER_CTYPE_MSGPACK.to_string()); + headers.insert(DATADOG_TRACE_COUNT_STR, chunks.to_string()); + headers.insert(CONTENT_TYPE.as_str(), APPLICATION_MSGPACK_STR.to_string()); let payload = match rmp_serde::to_vec_named(payloads) { Ok(p) => p, @@ -267,7 +265,7 @@ impl SendData { loop { match futures.next().await { Some((response, payload_len, chunks)) => { - result.update(response, payload_len, chunks).await; + result.update(response, payload_len, chunks); if result.last_result.is_err() { return result; } @@ -530,7 +528,7 @@ mod tests { let mock = server .mock_async(|when, then| { when.method(POST) - .header(HEADER_DD_TRACE_COUNT, "1") + .header(DATADOG_TRACE_COUNT_STR, "1") .header("Content-type", "application/msgpack") .header("datadog-meta-lang", header_tags.lang) .header( @@ -589,7 +587,7 @@ mod tests { let mock = server .mock_async(|when, then| { when.method(POST) - .header(HEADER_DD_TRACE_COUNT, "1") + .header(DATADOG_TRACE_COUNT_STR, "1") .header("Content-type", "application/msgpack") .header("datadog-meta-lang", header_tags.lang) .header( @@ -774,7 +772,7 @@ mod tests { let mock = server .mock_async(|when, then| { when.method(POST) - .header(HEADER_DD_TRACE_COUNT, "2") + .header(DATADOG_TRACE_COUNT_STR, "2") .header("Content-type", "application/msgpack") .header("datadog-meta-lang", header_tags.lang) .header( diff --git a/trace-utils/src/send_data/send_data_result.rs b/trace-utils/src/send_data/send_data_result.rs index 246cba6fd..7d78cb083 100644 --- a/trace-utils/src/send_data/send_data_result.rs +++ b/trace-utils/src/send_data/send_data_result.rs @@ -12,7 +12,7 @@ pub struct SendDataResult { pub last_result: anyhow::Result>, /// Count metric for 'trace_api.requests'. pub requests_count: u64, - /// Count metric for 'trace_api.responses'. Each key maps a different HTTP status code. + /// Count metric for 'trace_api.responses'. Each key maps a different HTTP status code. pub responses_count_per_code: HashMap, /// Count metric for 'trace_api.errors' (type: timeout). pub errors_timeout: u64, @@ -53,7 +53,7 @@ impl SendDataResult { /// * `res` - [`SendWithRetryResult`]. /// * `bytes_sent` - Number of bytes in the payload sent. /// * `chunks` - Number of chunks sent or dropped in the request. - pub(crate) async fn update(&mut self, res: SendWithRetryResult, bytes_sent: u64, chunks: u64) { + pub(crate) fn update(&mut self, res: SendWithRetryResult, bytes_sent: u64, chunks: u64) { match res { Ok((response, attempts)) => { *self @@ -66,7 +66,7 @@ impl SendDataResult { self.requests_count += u64::from(attempts); } Err(err) => match err { - SendWithRetryError::Http((response, attempts)) => { + SendWithRetryError::Http(response, attempts) => { let status_code = response.status().as_u16(); self.errors_status_code += 1; *self @@ -82,7 +82,7 @@ impl SendDataResult { self.chunks_dropped += chunks; self.requests_count += u64::from(attempts); } - SendWithRetryError::Network(attempts) => { + SendWithRetryError::Network(_, attempts) => { self.errors_network += 1; self.chunks_dropped += chunks; self.requests_count += u64::from(attempts); diff --git a/trace-utils/src/send_with_retry/mod.rs b/trace-utils/src/send_with_retry/mod.rs index 2ad9e9a8e..d91fca5b9 100644 --- a/trace-utils/src/send_with_retry/mod.rs +++ b/trace-utils/src/send_with_retry/mod.rs @@ -21,11 +21,11 @@ pub type SendWithRetryResult = Result<(Response, Attempts), SendWithRetryE #[derive(Debug)] pub enum SendWithRetryError { /// The request received an error HTTP code. - Http((Response, Attempts)), + Http(Response, Attempts), /// Treats timeout errors originated in the transport layer. Timeout(Attempts), /// Treats errors coming from networking. - Network(Attempts), + Network(hyper::Error, Attempts), /// Treats errors coming from building the request Build(Attempts), } @@ -33,9 +33,9 @@ pub enum SendWithRetryError { impl std::fmt::Display for SendWithRetryError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - SendWithRetryError::Http(_) => write!(f, "Http error code received"), + SendWithRetryError::Http(_, _) => write!(f, "Http error code received"), SendWithRetryError::Timeout(_) => write!(f, "Request timed out"), - SendWithRetryError::Network(_) => write!(f, "Network error"), + SendWithRetryError::Network(error, _) => write!(f, "Network error: {error}"), SendWithRetryError::Build(_) => { write!(f, "Failed to build request due to invalid propery") } @@ -49,7 +49,7 @@ impl SendWithRetryError { fn from_request_error(err: RequestError, request_attempt: Attempts) -> Self { match err { RequestError::Build => SendWithRetryError::Build(request_attempt), - RequestError::Network => SendWithRetryError::Network(request_attempt), + RequestError::Network(error) => SendWithRetryError::Network(error, request_attempt), RequestError::TimeoutSocket => SendWithRetryError::Timeout(request_attempt), RequestError::TimeoutApi => SendWithRetryError::Timeout(request_attempt), } @@ -59,7 +59,7 @@ impl SendWithRetryError { #[derive(Debug)] enum RequestError { Build, - Network, + Network(hyper::Error), TimeoutSocket, TimeoutApi, } @@ -69,7 +69,7 @@ impl std::fmt::Display for RequestError { match self { RequestError::TimeoutSocket => write!(f, "Socket timed out"), RequestError::TimeoutApi => write!(f, "Api timeout exhausted"), - RequestError::Network => write!(f, "Network error"), + RequestError::Network(error) => write!(f, "Network error: {error}"), RequestError::Build => write!(f, "Failed to build request due to invalid propery"), } } @@ -147,7 +147,7 @@ pub async fn send_with_retry( retry_strategy.delay(request_attempt).await; continue; } else { - return Err(SendWithRetryError::Http((response, request_attempt))); + return Err(SendWithRetryError::Http(response, request_attempt)); } } else { return Ok((response, request_attempt)); @@ -194,7 +194,7 @@ async fn send_request( if e.is_timeout() { Err(RequestError::TimeoutSocket) } else { - Err(RequestError::Network) + Err(RequestError::Network(e)) } } }, @@ -250,7 +250,7 @@ mod tests { .await; assert!(result.is_err(), "Expected an error result"); assert!( - matches!(result.unwrap_err(), SendWithRetryError::Http((_, 1))), + matches!(result.unwrap_err(), SendWithRetryError::Http(_, 1)), "Expected an http error with one attempt" ); }); @@ -345,7 +345,7 @@ mod tests { ) .await; assert!( - matches!(result.unwrap_err(), SendWithRetryError::Http((_, attempts)) if attempts == expected_retry_attempts), + matches!(result.unwrap_err(), SendWithRetryError::Http(_, attempts) if attempts == expected_retry_attempts), "Expected an error result after max retry attempts" ); });