Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Svls 6036 respect timeouts #851

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 62 additions & 67 deletions dogstatsd/src/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

//!Types to serialize data into the Datadog API

use crate::flusher::ShippingError;
use datadog_protos::metrics::SketchPayload;
use derive_more::{Display, Into};
use lazy_static::lazy_static;
use protobuf::Message;
use regex::Regex;
use reqwest;
use reqwest::{Client, Response};
use serde::{Serialize, Serializer};
use serde_json;
use std::time::Duration;
Expand Down Expand Up @@ -119,11 +121,11 @@ impl MetricsIntakeUrlPrefix {
}

/// Interface for the `DogStatsD` metrics intake API.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DdApi {
api_key: String,
metrics_intake_url_prefix: MetricsIntakeUrlPrefix,
client: reqwest::Client,
client: Option<Client>,
}

impl DdApi {
Expand All @@ -134,11 +136,11 @@ impl DdApi {
https_proxy: Option<String>,
timeout: Duration,
) -> Self {
let client = match Self::build_client(https_proxy, timeout) {
Ok(client) => client,
let client = match build_client(https_proxy, timeout) {
Ok(client) => Some(client),
Err(e) => {
error!("Unable to parse proxy URL, no proxy will be used. {:?}", e);
reqwest::Client::new()
error!("Unable to create client {:?}", e);
None
}
};
DdApi {
Expand All @@ -149,40 +151,32 @@ impl DdApi {
}

/// Ship a serialized series to the API, blocking
pub async fn ship_series(&self, series: &Series) {
let body = serde_json::to_vec(&series).expect("failed to serialize series");
debug!("Sending body: {:?}", &series);

pub async fn ship_series(&self, series: &Series) -> Result<Response, ShippingError> {
let url = format!("{}/api/v2/series", &self.metrics_intake_url_prefix);
let resp = self
.client
.post(&url)
.header("DD-API-KEY", &self.api_key)
.header("Content-Type", "application/json")
.body(body)
.send()
.await;

match resp {
Ok(resp) => match resp.status() {
reqwest::StatusCode::ACCEPTED => {}
unexpected_status_code => {
debug!(
"{}: Failed to push to API: {:?}",
unexpected_status_code,
resp.text().await.unwrap_or_default()
);
}
},
Err(e) => {
debug!("500: Failed to push to API: {:?}", e);
}
};
if let Ok(safe_body) = serde_json::to_vec(&series) {
debug!("Sending body: {:?}", &series);
self.ship_data(url, safe_body, "application/json").await
} else {
Err(ShippingError::Payload(
"Failed to serialize series".to_string(),
))
}
}

pub async fn ship_distributions(&self, sketches: &SketchPayload) {
pub async fn ship_distributions(
&self,
sketches: &SketchPayload,
) -> Result<Response, ShippingError> {
let url = format!("{}/api/beta/sketches", &self.metrics_intake_url_prefix);
debug!("Sending distributions: {:?}", &sketches);
if let Ok(safe_body) = sketches.write_to_bytes() {
debug!("Sending distributions: {:?}", &sketches);
self.ship_data(url, safe_body, "application/x-protobuf")
.await
} else {
Err(ShippingError::Payload(
"Failed to serialize sketches".to_string(),
))
}
// TODO maybe go to coded output stream if we incrementally
// add sketch payloads to the buffer
// something like this, but fix the utf-8 encoding issue
Expand All @@ -192,43 +186,44 @@ impl DdApi {
// let _ = output_stream.write_message_no_tag(&sketches);
// TODO not working, has utf-8 encoding issue in dist-intake
//}
let resp = self
.client
.post(&url)
.header("DD-API-KEY", &self.api_key)
.header("Content-Type", "application/x-protobuf")
.body(sketches.write_to_bytes().expect("can't write to buffer"))
.send()
.await;
match resp {
Ok(resp) => match resp.status() {
reqwest::StatusCode::ACCEPTED => {}
unexpected_status_code => {
debug!(
"{}: Failed to push to API: {:?}",
unexpected_status_code,
resp.text().await.unwrap_or_default()
);
}
},
Err(e) => {
debug!("500: Failed to push to API: {:?}", e);
}
};
}

fn build_client(
https_proxy: Option<String>,
timeout: Duration,
) -> Result<reqwest::Client, reqwest::Error> {
let mut builder = reqwest::Client::builder().timeout(timeout);
if let Some(proxy) = https_proxy {
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
async fn ship_data(
&self,
url: String,
body: Vec<u8>,
content_type: &str,
) -> Result<Response, ShippingError> {
if let Some(client) = &self.client {
let start = std::time::Instant::now();

let resp = client
.post(&url)
.header("DD-API-KEY", &self.api_key)
.header("Content-Type", content_type)
.body(body)
.send()
.await;

let elapsed = start.elapsed();
debug!("Request to {} took {}ms", url, elapsed.as_millis());
resp.map_err(|e| {
ShippingError::Destination(e.status(), format!("Cannot reach {}", url))
})
} else {
Err(ShippingError::Destination(None, "No client".to_string()))
}
builder.build()
}
}

fn build_client(https_proxy: Option<String>, timeout: Duration) -> Result<Client, reqwest::Error> {
let mut builder = Client::builder().timeout(timeout);
if let Some(proxy) = https_proxy {
builder = builder.proxy(reqwest::Proxy::https(proxy)?);
}
builder.build()
}

#[derive(Debug, Serialize, Clone, Copy)]
/// A single point in time
pub(crate) struct Point {
Expand Down
57 changes: 49 additions & 8 deletions dogstatsd/src/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
use crate::aggregator::Aggregator;
use crate::datadog;
use datadog::{DdApi, MetricsIntakeUrlPrefix};
use reqwest::{Response, StatusCode};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::debug;
use tracing::{debug, error};

pub struct Flusher {
dd_api: DdApi,
Expand Down Expand Up @@ -50,14 +51,54 @@ impl Flusher {

debug!("Flushing {n_series} series and {n_distributions} distributions");

// TODO: client timeout is for each invocation, so NxM times with N time series batches and
// M distro batches
for a_batch in all_series {
self.dd_api.ship_series(&a_batch).await;
// TODO(astuyve) retry and do not panic
let dd_api_clone = self.dd_api.clone();
tokio::spawn(async move {
alexgallotta marked this conversation as resolved.
Show resolved Hide resolved
for a_batch in all_series {
let continue_shipping =
should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await;
if !continue_shipping {
break;
}
}
});
let dd_api_clone = self.dd_api.clone();
tokio::spawn(async move {
alexgallotta marked this conversation as resolved.
Show resolved Hide resolved
for a_batch in all_distributions {
let continue_shipping =
should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await;
if !continue_shipping {
break;
}
}
});
}
}

pub enum ShippingError {
Payload(String),
Destination(Option<StatusCode>, String),
}

async fn should_try_next_batch(resp: Result<Response, ShippingError>) -> bool {
match resp {
Ok(resp_payload) => match resp_payload.status() {
StatusCode::ACCEPTED => true,
unexpected_status_code => {
error!(
"{}: Failed to push to API: {:?}",
unexpected_status_code,
resp_payload.text().await.unwrap_or_default()
);
true
}
},
Err(ShippingError::Payload(msg)) => {
error!("Failed to prepare payload. Data dropped: {}", msg);
true
}
for a_batch in all_distributions {
self.dd_api.ship_distributions(&a_batch).await;
Err(ShippingError::Destination(sc, msg)) => {
error!("Error shipping data: {:?} {}", sc, msg);
false
}
}
}
Loading