diff --git a/dogstatsd/src/datadog.rs b/dogstatsd/src/datadog.rs index 41c40a511..e2641738b 100644 --- a/dogstatsd/src/datadog.rs +++ b/dogstatsd/src/datadog.rs @@ -3,12 +3,14 @@ //!Types to serialize data into the Datadog API +use crate::flusher::ShippingError; use datadog_protos::metrics::SketchPayload; use derive_more::{Display, Into}; use lazy_static::lazy_static; use protobuf::Message; use regex::Regex; use reqwest; +use reqwest::{Client, Response}; use serde::{Serialize, Serializer}; use serde_json; use std::time::Duration; @@ -114,16 +116,16 @@ impl MetricsIntakeUrlPrefix { #[inline] fn from_site(site: Site) -> Self { - Self(format!("https://api.{}", site)) + Self(format!("https://api.{site}")) } } /// Interface for the `DogStatsD` metrics intake API. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DdApi { api_key: String, metrics_intake_url_prefix: MetricsIntakeUrlPrefix, - client: reqwest::Client, + client: Option, } impl DdApi { @@ -134,13 +136,11 @@ impl DdApi { https_proxy: Option, timeout: Duration, ) -> Self { - let client = match Self::build_client(https_proxy, timeout) { - Ok(client) => client, - Err(e) => { - error!("Unable to parse proxy URL, no proxy will be used. {:?}", e); - reqwest::Client::new() - } - }; + let client = build_client(https_proxy, timeout) + .inspect_err(|e| { + error!("Unable to create client {:?}", e); + }) + .ok(); DdApi { api_key, metrics_intake_url_prefix, @@ -149,40 +149,25 @@ impl DdApi { } /// Ship a serialized series to the API, blocking - pub async fn ship_series(&self, series: &Series) { - let body = serde_json::to_vec(&series).expect("failed to serialize series"); - debug!("Sending body: {:?}", &series); - + pub async fn ship_series(&self, series: &Series) -> Result { let url = format!("{}/api/v2/series", &self.metrics_intake_url_prefix); - let resp = self - .client - .post(&url) - .header("DD-API-KEY", &self.api_key) - .header("Content-Type", "application/json") - .body(body) - .send() - .await; - - match resp { - Ok(resp) => match resp.status() { - reqwest::StatusCode::ACCEPTED => {} - unexpected_status_code => { - debug!( - "{}: Failed to push to API: {:?}", - unexpected_status_code, - resp.text().await.unwrap_or_default() - ); - } - }, - Err(e) => { - debug!("500: Failed to push to API: {:?}", e); - } - }; + let safe_body = serde_json::to_vec(&series) + .map_err(|e| ShippingError::Payload(format!("Failed to serialize series: {e}")))?; + debug!("Sending body: {:?}", &series); + self.ship_data(url, safe_body, "application/json").await } - pub async fn ship_distributions(&self, sketches: &SketchPayload) { + pub async fn ship_distributions( + &self, + sketches: &SketchPayload, + ) -> Result { let url = format!("{}/api/beta/sketches", &self.metrics_intake_url_prefix); + let safe_body = sketches + .write_to_bytes() + .map_err(|e| ShippingError::Payload(format!("Failed to serialize series: {e}")))?; debug!("Sending distributions: {:?}", &sketches); + self.ship_data(url, safe_body, "application/x-protobuf") + .await // TODO maybe go to coded output stream if we incrementally // add sketch payloads to the buffer // something like this, but fix the utf-8 encoding issue @@ -192,41 +177,40 @@ impl DdApi { // let _ = output_stream.write_message_no_tag(&sketches); // TODO not working, has utf-8 encoding issue in dist-intake //} - let resp = self + } + + async fn ship_data( + &self, + url: String, + body: Vec, + content_type: &str, + ) -> Result { + let client = &self .client + .as_ref() + .ok_or_else(|| ShippingError::Destination(None, "No client".to_string()))?; + let start = std::time::Instant::now(); + + let resp = client .post(&url) .header("DD-API-KEY", &self.api_key) - .header("Content-Type", "application/x-protobuf") - .body(sketches.write_to_bytes().expect("can't write to buffer")) + .header("Content-Type", content_type) + .body(body) .send() .await; - match resp { - Ok(resp) => match resp.status() { - reqwest::StatusCode::ACCEPTED => {} - unexpected_status_code => { - debug!( - "{}: Failed to push to API: {:?}", - unexpected_status_code, - resp.text().await.unwrap_or_default() - ); - } - }, - Err(e) => { - debug!("500: Failed to push to API: {:?}", e); - } - }; + + let elapsed = start.elapsed(); + debug!("Request to {} took {}ms", url, elapsed.as_millis()); + resp.map_err(|e| ShippingError::Destination(e.status(), format!("Cannot reach {url}"))) } +} - fn build_client( - https_proxy: Option, - timeout: Duration, - ) -> Result { - let mut builder = reqwest::Client::builder().timeout(timeout); - if let Some(proxy) = https_proxy { - builder = builder.proxy(reqwest::Proxy::https(proxy)?); - } - builder.build() +fn build_client(https_proxy: Option, timeout: Duration) -> Result { + let mut builder = Client::builder().timeout(timeout); + if let Some(proxy) = https_proxy { + builder = builder.proxy(reqwest::Proxy::https(proxy)?); } + builder.build() } #[derive(Debug, Serialize, Clone, Copy)] diff --git a/dogstatsd/src/flusher.rs b/dogstatsd/src/flusher.rs index 98875b209..92a0d8c85 100644 --- a/dogstatsd/src/flusher.rs +++ b/dogstatsd/src/flusher.rs @@ -4,9 +4,10 @@ use crate::aggregator::Aggregator; use crate::datadog; use datadog::{DdApi, MetricsIntakeUrlPrefix}; +use reqwest::{Response, StatusCode}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use tracing::debug; +use tracing::{debug, error}; pub struct Flusher { dd_api: DdApi, @@ -50,14 +51,63 @@ impl Flusher { debug!("Flushing {n_series} series and {n_distributions} distributions"); - // TODO: client timeout is for each invocation, so NxM times with N time series batches and - // M distro batches - for a_batch in all_series { - self.dd_api.ship_series(&a_batch).await; - // TODO(astuyve) retry and do not panic + let dd_api_clone = self.dd_api.clone(); + let series_handle = tokio::spawn(async move { + for a_batch in all_series { + let continue_shipping = + should_try_next_batch(dd_api_clone.ship_series(&a_batch).await).await; + if !continue_shipping { + break; + } + } + }); + let dd_api_clone = self.dd_api.clone(); + let distributions_handle = tokio::spawn(async move { + for a_batch in all_distributions { + let continue_shipping = + should_try_next_batch(dd_api_clone.ship_distributions(&a_batch).await).await; + if !continue_shipping { + break; + } + } + }); + + match tokio::try_join!(series_handle, distributions_handle) { + Ok(_) => { + debug!("Successfully flushed {n_series} series and {n_distributions} distributions") + } + Err(err) => { + error!("Failed to flush metrics{err}") + } + }; + } +} + +pub enum ShippingError { + Payload(String), + Destination(Option, String), +} + +async fn should_try_next_batch(resp: Result) -> bool { + match resp { + Ok(resp_payload) => match resp_payload.status() { + StatusCode::ACCEPTED => true, + unexpected_status_code => { + error!( + "{}: Failed to push to API: {:?}", + unexpected_status_code, + resp_payload.text().await.unwrap_or_default() + ); + true + } + }, + Err(ShippingError::Payload(msg)) => { + error!("Failed to prepare payload. Data dropped: {}", msg); + true } - for a_batch in all_distributions { - self.dd_api.ship_distributions(&a_batch).await; + Err(ShippingError::Destination(sc, msg)) => { + error!("Error shipping data: {:?} {}", sc, msg); + false } } }