Skip to content

Svls 6036 respect timeouts #851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 20, 2025
116 changes: 50 additions & 66 deletions dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

//!Types to serialize data into the Datadog API

use crate::flusher::ShippingError;
use datadog_protos::metrics::SketchPayload;
use derive_more::{Display, Into};
use lazy_static::lazy_static;
use protobuf::Message;
use regex::Regex;
use reqwest;
use reqwest::{Client, Response};
use serde::{Serialize, Serializer};
use serde_json;
use std::time::Duration;
Expand Down Expand Up @@ -114,16 +116,16 @@ impl MetricsIntakeUrlPrefix {

#[inline]
fn from_site(site: Site) -> Self {
Self(format!("https://api.{}", site))
Self(format!("https://api.{site}"))
}
}

/// Interface for the `DogStatsD` metrics intake API.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DdApi {
api_key: String,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
client: reqwest::Client,
client: Option<Client>,
}

impl DdApi {
Expand All @@ -134,13 +136,11 @@ impl DdApi {
https_proxy: Option<String>,
timeout: Duration,
) -> Self {
let client = match Self::build_client(https_proxy, timeout) {
Ok(client) => client,
Err(e) => {
error!("Unable to parse proxy URL, no proxy will be used. {:?}", e);
reqwest::Client::new()
}
};
let client = build_client(https_proxy, timeout)
.inspect_err(|e| {
error!("Unable to create client {:?}", e);
})
.ok();
DdApi {
api_key,
metrics_intake_url_prefix,
Expand All @@ -149,40 +149,25 @@ impl DdApi {
}

/// Ship a serialized series to the API, blocking
pub async fn ship_series(&self, series: &Series) {
let body = serde_json::to_vec(&series).expect("failed to serialize series");
debug!("Sending body: {:?}", &series);

pub async fn ship_series(&self, series: &Series) -> Result<Response, ShippingError> {
let url = format!("{}/api/v2/series", &self.metrics_intake_url_prefix);
let resp = self
.client
.post(&url)
.header("DD-API-KEY", &self.api_key)
.header("Content-Type", "application/json")
.body(body)
.send()
.await;

match resp {
Ok(resp) => match resp.status() {
reqwest::StatusCode::ACCEPTED => {}
unexpected_status_code => {
debug!(
"{}: Failed to push to API: {:?}",
unexpected_status_code,
resp.text().await.unwrap_or_default()
);
}
},
Err(e) => {
debug!("500: Failed to push to API: {:?}", e);
}
};
let safe_body = serde_json::to_vec(&series)
.map_err(|e| ShippingError::Payload(format!("Failed to serialize series: {e}")))?;
debug!("Sending body: {:?}", &series);
self.ship_data(url, safe_body, "application/json").await
}

pub async fn ship_distributions(&self, sketches: &SketchPayload) {
pub async fn ship_distributions(
&self,
sketches: &SketchPayload,
) -> Result<Response, ShippingError> {
let url = format!("{}/api/beta/sketches", &self.metrics_intake_url_prefix);
let safe_body = sketches
.write_to_bytes()
.map_err(|e| ShippingError::Payload(format!("Failed to serialize series: {e}")))?;
debug!("Sending distributions: {:?}", &sketches);
self.ship_data(url, safe_body, "application/x-protobuf")
.await
// TODO maybe go to coded output stream if we incrementally
// add sketch payloads to the buffer
// something like this, but fix the utf-8 encoding issue
Expand All @@ -192,41 +177,40 @@ impl DdApi {
// let _ = output_stream.write_message_no_tag(&sketches);
// TODO not working, has utf-8 encoding issue in dist-intake
//}
let resp = self
}

async fn ship_data(
&self,
url: String,
body: Vec<u8>,
content_type: &str,
) -> Result<Response, ShippingError> {
let client = &self
.client
.as_ref()
.ok_or_else(|| ShippingError::Destination(None, "No client".to_string()))?;
let start = std::time::Instant::now();

let resp = client
.post(&url)
.header("DD-API-KEY", &self.api_key)
.header("Content-Type", "application/x-protobuf")
.body(sketches.write_to_bytes().expect("can't write to buffer"))
.header("Content-Type", content_type)
.body(body)
.send()
.await;
match resp {
Ok(resp) => match resp.status() {
reqwest::StatusCode::ACCEPTED => {}
unexpected_status_code => {
debug!(
"{}: Failed to push to API: {:?}",
unexpected_status_code,
resp.text().await.unwrap_or_default()
);
}
},
Err(e) => {
debug!("500: Failed to push to API: {:?}", e);
}
};

let elapsed = start.elapsed();
debug!("Request to {} took {}ms", url, elapsed.as_millis());
resp.map_err(|e| ShippingError::Destination(e.status(), format!("Cannot reach {url}")))
}
}

fn build_client(
https_proxy: Option<String>,
timeout: Duration,
) -> Result<reqwest::Client, reqwest::Error> {
let mut builder = reqwest::Client::builder().timeout(timeout);
if let Some(proxy) = https_proxy {
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
}
builder.build()
fn build_client(https_proxy: Option<String>, timeout: Duration) -> Result<Client, reqwest::Error> {
let mut builder = Client::builder().timeout(timeout);
if let Some(proxy) = https_proxy {
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
}
builder.build()
}

#[derive(Debug, Serialize, Clone, Copy)]
Expand Down
66 changes: 58 additions & 8 deletions dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use crate::aggregator::Aggregator;
use crate::datadog;
use datadog::{DdApi, MetricsIntakeUrlPrefix};
use reqwest::{Response, StatusCode};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::debug;
use tracing::{debug, error};

pub struct Flusher {
dd_api: DdApi,
Expand Down Expand Up @@ -50,14 +51,63 @@ impl Flusher {

debug!("Flushing {n_series} series and {n_distributions} distributions");

// TODO: client timeout is for each invocation, so NxM times with N time series batches and
// M distro batches
for a_batch in all_series {
self.dd_api.ship_series(&a_batch).await;
// TODO(astuyve) retry and do not panic
let dd_api_clone = self.dd_api.clone();
let series_handle = tokio::spawn(async move {
for a_batch in all_series {
let continue_shipping =
should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await;
if !continue_shipping {
break;
}
}
});
let dd_api_clone = self.dd_api.clone();
let distributions_handle = tokio::spawn(async move {
for a_batch in all_distributions {
let continue_shipping =
should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await;
if !continue_shipping {
break;
}
}
});

match tokio::try_join!(series_handle, distributions_handle) {
Ok(_) => {
debug!("Successfully flushed {n_series} series and {n_distributions} distributions")
}
Err(err) => {
error!("Failed to flush metrics{err}")
}
};
}
}

pub enum ShippingError {
Payload(String),
Destination(Option<StatusCode>, String),
}

async fn should_try_next_batch(resp: Result<Response, ShippingError>) -> bool {
match resp {
Ok(resp_payload) => match resp_payload.status() {
StatusCode::ACCEPTED => true,
unexpected_status_code => {
error!(
"{}: Failed to push to API: {:?}",
unexpected_status_code,
resp_payload.text().await.unwrap_or_default()
);
true
}
},
Err(ShippingError::Payload(msg)) => {
error!("Failed to prepare payload. Data dropped: {}", msg);
true
}
for a_batch in all_distributions {
self.dd_api.ship_distributions(&a_batch).await;
Err(ShippingError::Destination(sc, msg)) => {
error!("Error shipping data: {:?} {}", sc, msg);
false
}
}
}
Loading