Skip to content

Commit faad98b

Browse files
authored
Make runtime id configurable in the trace exporter. (#849)
1 parent b8cdf9e commit faad98b

File tree

2 files changed

+82
-26
lines changed

2 files changed

+82
-26
lines changed

data-pipeline/src/telemetry/mod.rs

+53-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub struct TelemetryClientBuilder {
2525
language_version: Option<String>,
2626
tracer_version: Option<String>,
2727
config: ddtelemetry::config::Config,
28+
runtime_id: Option<String>,
2829
}
2930

3031
impl TelemetryClientBuilder {
@@ -68,17 +69,29 @@ impl TelemetryClientBuilder {
6869
self
6970
}
7071

72+
/// Sets runtime id for the telemetry client.
73+
pub fn set_runtime_id(mut self, id: &str) -> Self {
74+
self.runtime_id = Some(id.to_string());
75+
self
76+
}
77+
7178
/// Builds the telemetry client.
7279
pub async fn build(self) -> Result<TelemetryClient, TelemetryError> {
73-
let (worker, handle) = TelemetryWorkerBuilder::new_fetch_host(
80+
let mut builder = TelemetryWorkerBuilder::new_fetch_host(
7481
self.service_name.unwrap(),
7582
self.language.unwrap(),
7683
self.language_version.unwrap(),
7784
self.tracer_version.unwrap(),
78-
)
79-
.spawn_with_config(self.config)
80-
.await
81-
.map_err(|e| TelemetryError::Builder(e.to_string()))?;
85+
);
86+
87+
if let Some(id) = self.runtime_id {
88+
builder.runtime_id = Some(id);
89+
}
90+
91+
let (worker, handle) = builder
92+
.spawn_with_config(self.config)
93+
.await
94+
.map_err(|e| TelemetryError::Builder(e.to_string()))?;
8295

8396
Ok(TelemetryClient {
8497
handle,
@@ -534,4 +547,39 @@ mod tests {
534547
let _ = client.handle.await;
535548
telemetry_srv.assert_hits_async(1).await;
536549
}
550+
551+
#[cfg_attr(miri, ignore)]
552+
#[tokio::test]
553+
async fn runtime_id_test() {
554+
let server = MockServer::start_async().await;
555+
556+
let telemetry_srv = server
557+
.mock_async(|when, then| {
558+
when.method(POST).body_contains(r#""runtime_id":"foo""#);
559+
then.status(200).body("");
560+
})
561+
.await;
562+
563+
let result = TelemetryClientBuilder::default()
564+
.set_service_name("test_service")
565+
.set_language("test_language")
566+
.set_language_version("test_language_version")
567+
.set_tracer_version("test_tracer_version")
568+
.set_url(&server.url("/"))
569+
.set_hearbeat(100)
570+
.set_runtime_id("foo")
571+
.build()
572+
.await;
573+
574+
assert!(result.is_ok());
575+
576+
let client = result.unwrap();
577+
578+
client.start().await;
579+
client.shutdown().await;
580+
let _ = client.handle.await;
581+
582+
// Check for 2 hits: app-started and app-closing.
583+
telemetry_srv.assert_hits_async(2).await;
584+
}
537585
}

data-pipeline/src/trace_exporter/mod.rs

+29-21
Original file line numberDiff line numberDiff line change
@@ -711,8 +711,9 @@ impl TraceExporter {
711711
const DEFAULT_AGENT_URL: &str = "http://127.0.0.1:8126";
712712

713713
#[derive(Default)]
714-
struct TelemetryConfig {
715-
heartbeat: u64,
714+
pub struct TelemetryConfig {
715+
pub heartbeat: u64,
716+
pub runtime_id: Option<String>,
716717
}
717718

718719
#[allow(missing_docs)]
@@ -869,12 +870,12 @@ impl TraceExporterBuilder {
869870
}
870871

871872
/// Enables sending telemetry metrics.
872-
pub fn enable_telemetry(mut self, heartbeat_ms: Option<u64>) -> Self {
873-
let mut config = TelemetryConfig::default();
874-
if let Some(interval) = heartbeat_ms {
875-
config.heartbeat = interval;
873+
pub fn enable_telemetry(mut self, cfg: Option<TelemetryConfig>) -> Self {
874+
if let Some(cfg) = cfg {
875+
self.telemetry = Some(cfg);
876+
} else {
877+
self.telemetry = Some(TelemetryConfig::default());
876878
}
877-
self.telemetry = Some(config);
878879
self
879880
}
880881

@@ -915,18 +916,19 @@ impl TraceExporterBuilder {
915916
}
916917

917918
let telemetry = if let Some(telemetry_config) = self.telemetry {
918-
Some(
919-
runtime.block_on(
920-
TelemetryClientBuilder::default()
921-
.set_language(&self.language)
922-
.set_language_version(&self.language_version)
923-
.set_service_name(&self.service)
924-
.set_tracer_version(&self.tracer_version)
925-
.set_hearbeat(telemetry_config.heartbeat)
926-
.set_url(base_url)
927-
.build(),
928-
)?,
929-
)
919+
Some(runtime.block_on(async {
920+
let mut builder = TelemetryClientBuilder::default()
921+
.set_language(&self.language)
922+
.set_language_version(&self.language_version)
923+
.set_service_name(&self.service)
924+
.set_tracer_version(&self.tracer_version)
925+
.set_hearbeat(telemetry_config.heartbeat)
926+
.set_url(base_url);
927+
if let Some(id) = telemetry_config.runtime_id {
928+
builder = builder.set_runtime_id(&id);
929+
}
930+
builder.build().await
931+
})?)
930932
} else {
931933
None
932934
};
@@ -1003,7 +1005,10 @@ mod tests {
10031005
.set_input_format(TraceExporterInputFormat::Proxy)
10041006
.set_output_format(TraceExporterOutputFormat::V07)
10051007
.set_client_computed_stats()
1006-
.enable_telemetry(Some(1000))
1008+
.enable_telemetry(Some(TelemetryConfig {
1009+
heartbeat: 1000,
1010+
runtime_id: None,
1011+
}))
10071012
.build()
10081013
.unwrap();
10091014

@@ -1749,7 +1754,10 @@ mod tests {
17491754
.set_language("nodejs")
17501755
.set_language_version("1.0")
17511756
.set_language_interpreter("v8")
1752-
.enable_telemetry(Some(100))
1757+
.enable_telemetry(Some(TelemetryConfig {
1758+
heartbeat: 100,
1759+
..Default::default()
1760+
}))
17531761
.build()
17541762
.unwrap();
17551763

0 commit comments

Comments
 (0)