Skip to content

Commit

Permalink
Lazily initalize cadence client when sending stats in dogstatsd-client
Browse files Browse the repository at this point in the history
APMSP-1805. Sidecar potentially has 100s of sessions, each of which
creates dogstatsd clients. Defer initalizing new cadence clients until
we are actually ready to send. Cadence creates a new OS thread every
time a client is initialized.
  • Loading branch information
ekump committed Feb 19, 2025
1 parent 4ccb7b6 commit 6714915
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 73 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use ddcommon::header::{
};
use ddcommon::tag::Tag;
use ddcommon::{connector, tag, Endpoint};
use dogstatsd_client::{new_flusher, Client, DogStatsDAction};
use dogstatsd_client::{new, Client, DogStatsDAction};
use either::Either;
use hyper::body::HttpBody;
use hyper::http::uri::PathAndQuery;
Expand Down Expand Up @@ -921,8 +921,8 @@ impl TraceExporterBuilder {
.build()?;

let dogstatsd = self.dogstatsd_url.and_then(|u| {
new_flusher(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return
// None
new(Endpoint::from_slice(&u)).ok() // If we couldn't set the endpoint return
// None
});

let base_url = self.url.as_deref().unwrap_or(DEFAULT_AGENT_URL);
Expand Down
4 changes: 4 additions & 0 deletions dogstatsd-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ serde = { version = "1.0", features = ["derive", "rc"] }
tracing = { version = "0.1", default-features = false }
anyhow = { version = "1.0" }
http = "0.2"


[dev-dependencies]
tokio = {version = "1.23", features = ["rt", "time", "test-util", "rt-multi-thread"], default-features = false}
194 changes: 128 additions & 66 deletions dogstatsd-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ddcommon::tag::Tag;
use ddcommon::Endpoint;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use tracing::{debug, error, info};
use tracing::error;

use anyhow::anyhow;
use cadence::prelude::*;
Expand All @@ -21,6 +21,7 @@ use ddcommon::connector::uds::socket_path_from_uri;
use std::net::{ToSocketAddrs, UdpSocket};
#[cfg(unix)]
use std::os::unix::net::UnixDatagram;
use std::sync::{Arc, Mutex};

// Queue with a maximum capacity of 32K elements
const QUEUE_SIZE: usize = 32 * 1024;
Expand Down Expand Up @@ -67,62 +68,57 @@ pub enum DogStatsDAction<'a, T: AsRef<str>, V: IntoIterator<Item = &'a Tag>> {
Set(T, i64, V),
}

/// A dogstatsd-client that flushes stats to a given endpoint. Use `new_flusher` to build one.
#[derive(Debug)]
/// A dogstatsd-client that flushes stats to a given endpoint.
#[derive(Debug, Default)]
pub struct Client {
client: StatsdClient,
client: Arc<Mutex<Option<StatsdClient>>>,
endpoint: Option<Endpoint>,
}

/// Build a new flusher instance pointed at the provided endpoint.
/// Returns error if the provided endpoint is not valid.
pub fn new_flusher(endpoint: Endpoint) -> anyhow::Result<Client> {
pub fn new(endpoint: Endpoint) -> anyhow::Result<Client> {
// defer initialization of the client until the first metric is sent and we definitely know the
// client is going to be used to communicate with the endpoint.
Ok(Client {
client: create_client(&endpoint)?,
endpoint: Some(endpoint),
..Default::default()
})
}

impl Client {
/// Set the destination for dogstatsd metrics, if an API Key is provided the client is disabled
/// as dogstatsd is not allowed in agentless mode. Returns an error if the provided endpoint
/// is invalid.
pub fn set_endpoint(&mut self, endpoint: Endpoint) -> anyhow::Result<()> {
self.client = match endpoint.api_key {
Some(_) => {
info!("DogStatsD is not available in agentless mode");
anyhow::bail!("DogStatsD is not available in agentless mode");
}
None => {
debug!("Updating DogStatsD endpoint to {}", endpoint.url);
create_client(&endpoint)?
}
};
Ok(())
}

/// Send a vector of DogStatsDActionOwned, this is the same as `send` except it uses the "owned"
/// version of DogStatsDAction. See the docs for DogStatsDActionOwned for details.
pub fn send_owned(&self, actions: Vec<DogStatsDActionOwned>) {
let client = &self.client;
let client_guard = match self.get_or_init_client() {
Ok(guard) => guard,
Err(e) => {
error!("Failed to get client: {}", e);
return;
}
};

for action in actions {
if let Err(err) = match action {
DogStatsDActionOwned::Count(metric, value, tags) => {
do_send(client.count_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Distribution(metric, value, tags) => {
do_send(client.distribution_with_tags(metric.as_ref(), value), &tags)
if let Some(client) = &*client_guard {
for action in actions {
if let Err(err) = match action {
DogStatsDActionOwned::Count(metric, value, tags) => {
do_send(client.count_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Distribution(metric, value, tags) => {
do_send(client.distribution_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Gauge(metric, value, tags) => {
do_send(client.gauge_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Histogram(metric, value, tags) => {
do_send(client.histogram_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Set(metric, value, tags) => {
do_send(client.set_with_tags(metric.as_ref(), value), &tags)
}
} {
error!("Error while sending metric: {}", err);
}
DogStatsDActionOwned::Gauge(metric, value, tags) => {
do_send(client.gauge_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Histogram(metric, value, tags) => {
do_send(client.histogram_with_tags(metric.as_ref(), value), &tags)
}
DogStatsDActionOwned::Set(metric, value, tags) => {
do_send(client.set_with_tags(metric.as_ref(), value), &tags)
}
} {
error!("Error while sending metric: {}", err);
}
}
}
Expand All @@ -133,31 +129,53 @@ impl Client {
&self,
actions: Vec<DogStatsDAction<'a, T, V>>,
) {
let client = &self.client;

for action in actions {
if let Err(err) = match action {
DogStatsDAction::Count(metric, value, tags) => {
let metric_builder = client.count_with_tags(metric.as_ref(), value);
do_send(metric_builder, tags)
}
DogStatsDAction::Distribution(metric, value, tags) => {
do_send(client.distribution_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Gauge(metric, value, tags) => {
do_send(client.gauge_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Histogram(metric, value, tags) => {
do_send(client.histogram_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Set(metric, value, tags) => {
do_send(client.set_with_tags(metric.as_ref(), value), tags)
let client_guard = match self.get_or_init_client() {
Ok(guard) => guard,
Err(e) => {
error!("Failed to get client: {}", e);
return;
}
};
if let Some(client) = &*client_guard {
for action in actions {
if let Err(err) = match action {
DogStatsDAction::Count(metric, value, tags) => {
let metric_builder = client.count_with_tags(metric.as_ref(), value);
do_send(metric_builder, tags)
}
DogStatsDAction::Distribution(metric, value, tags) => {
do_send(client.distribution_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Gauge(metric, value, tags) => {
do_send(client.gauge_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Histogram(metric, value, tags) => {
do_send(client.histogram_with_tags(metric.as_ref(), value), tags)
}
DogStatsDAction::Set(metric, value, tags) => {
do_send(client.set_with_tags(metric.as_ref(), value), tags)
}
} {
error!("Error while sending metric: {}", err);
}
} {
error!("Error while sending metric: {}", err);
}
}
}

fn get_or_init_client(&self) -> anyhow::Result<std::sync::MutexGuard<Option<StatsdClient>>> {
let mut client_guard = self
.client
.lock()
.map_err(|e| anyhow!("Failed to acquire dogstatsd client lock: {}", e))?;

if client_guard.is_none() {
if let Some(endpoint) = &self.endpoint {
*client_guard = Some(create_client(endpoint)?);
}
}

Ok(client_guard)
}
}

fn do_send<'m, 't, T, V: IntoIterator<Item = &'t Tag>>(
Expand Down Expand Up @@ -229,7 +247,7 @@ fn create_client(endpoint: &Endpoint) -> anyhow::Result<StatsdClient> {
#[cfg(test)]
mod test {
use crate::DogStatsDAction::{Count, Distribution, Gauge, Histogram, Set};
use crate::{create_client, new_flusher, DogStatsDActionOwned};
use crate::{create_client, new, DogStatsDActionOwned};
#[cfg(unix)]
use ddcommon::connector::uds::socket_path_to_uri;
use ddcommon::{tag, Endpoint};
Expand All @@ -244,7 +262,7 @@ mod test {
let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));

let flusher = new_flusher(Endpoint::from_slice(
let flusher = new(Endpoint::from_slice(
socket.local_addr().unwrap().to_string().as_str(),
))
.unwrap();
Expand Down Expand Up @@ -333,12 +351,56 @@ mod test {
Histogram(_, _, _) => {}
Set(_, _, _) => {}
}

// TODO: when std::mem::variant_count is in stable we can do this instead
// assert_eq!(
// std::mem::variant_count::<DogStatsDActionOwned>(),
// std::mem::variant_count::<DogStatsDAction<String, Vec<&Tag>>>(),
// "DogStatsDActionOwned and DogStatsDAction should have the same number of variants,
// did you forget to update one?", );
}
use std::sync::Arc;
use std::thread;

#[tokio::test]
async fn test_thread_safety() {
let socket = net::UdpSocket::bind("127.0.0.1:0").expect("failed to bind host socket");
let _ = socket.set_read_timeout(Some(Duration::from_millis(500)));
let endpoint = Endpoint::from_slice(socket.local_addr().unwrap().to_string().as_str());
let flusher = Arc::new(new(endpoint.clone()).unwrap());

{
let client = flusher
.client
.lock()
.expect("failed to obtain lock on client");
assert!(client.is_none());
}

let tasks: Vec<_> = (0..10)
.map(|_| {
let flusher_clone = Arc::clone(&flusher);
tokio::spawn(async move {
flusher_clone.send(vec![
Count("test_count", 3, &vec![tag!("foo", "bar")]),
Count("test_neg_count", -2, &vec![]),
Distribution("test_distribution", 4.2, &vec![]),
Gauge("test_gauge", 7.6, &vec![]),
Histogram("test_histogram", 8.0, &vec![]),
Set("test_set", 9, &vec![tag!("the", "end")]),
Set("test_neg_set", -1, &vec![]),
]);

let client = flusher_clone
.client
.lock()
.expect("failed to obtain lock on client within send thread");
assert!(client.is_some());
})
})
.collect();

for task in tasks {
task.await.unwrap();
}
}
}
4 changes: 2 additions & 2 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use datadog_live_debugger::sender::DebuggerType;
use datadog_remote_config::fetch::{ConfigInvariants, MultiTargetStats};
use datadog_trace_utils::tracer_header_tags::TracerHeaderTags;
use ddcommon::tag::Tag;
use dogstatsd_client::{new_flusher, DogStatsDActionOwned};
use dogstatsd_client::{new, DogStatsDActionOwned};
use tinybytes;

type NoResponse = Ready<()>;
Expand Down Expand Up @@ -706,7 +706,7 @@ impl SidecarInterface for SidecarServer {
cfg.set_endpoint(endpoint).ok();
});
session.configure_dogstatsd(|dogstatsd| {
let d = new_flusher(config.dogstatsd_endpoint.clone()).ok();
let d = new(config.dogstatsd_endpoint.clone()).ok();
*dogstatsd = d;
});
session.modify_debugger_config(|cfg| {
Expand Down

0 comments on commit 6714915

Please sign in to comment.