|
2 | 2 |
|
3 | 3 | use std::sync::Arc;
|
4 | 4 |
|
| 5 | +use opentelemetry_proto::tonic::common::v1::any_value::Value; |
| 6 | +use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; |
5 | 7 | use prost::Message;
|
6 | 8 | use relay_dynamic_config::Feature;
|
7 | 9 | use relay_event_normalization::span::tag_extraction;
|
@@ -62,10 +64,43 @@ fn convert_traces_data(item: Item, managed_envelope: &mut TypedEnvelope<SpanGrou
|
62 | 64 | return;
|
63 | 65 | }
|
64 | 66 | };
|
65 |
| - for resource in traces_data.resource_spans { |
66 |
| - for scope in resource.scope_spans { |
67 |
| - for span in scope.spans { |
68 |
| - // TODO: resources and scopes contain attributes, should denormalize into spans? |
| 67 | + for resource_spans in traces_data.resource_spans { |
| 68 | + for scope_spans in resource_spans.scope_spans { |
| 69 | + for mut span in scope_spans.spans { |
| 70 | + // Denormalize instrumentation scope and resource attributes into every span. |
| 71 | + if let Some(ref scope) = scope_spans.scope { |
| 72 | + if !scope.name.is_empty() { |
| 73 | + span.attributes.push(KeyValue { |
| 74 | + key: "instrumentation.name".to_owned(), |
| 75 | + value: Some(AnyValue { |
| 76 | + value: Some(Value::StringValue(scope.name.clone())), |
| 77 | + }), |
| 78 | + }) |
| 79 | + } |
| 80 | + if !scope.version.is_empty() { |
| 81 | + span.attributes.push(KeyValue { |
| 82 | + key: "instrumentation.version".to_owned(), |
| 83 | + value: Some(AnyValue { |
| 84 | + value: Some(Value::StringValue(scope.version.clone())), |
| 85 | + }), |
| 86 | + }) |
| 87 | + } |
| 88 | + scope.attributes.iter().for_each(|a| { |
| 89 | + span.attributes.push(KeyValue { |
| 90 | + key: format!("instrumentation.{}", a.key), |
| 91 | + value: a.value.clone(), |
| 92 | + }); |
| 93 | + }); |
| 94 | + } |
| 95 | + if let Some(ref resource) = resource_spans.resource { |
| 96 | + resource.attributes.iter().for_each(|a| { |
| 97 | + span.attributes.push(KeyValue { |
| 98 | + key: format!("resource.{}", a.key), |
| 99 | + value: a.value.clone(), |
| 100 | + }); |
| 101 | + }); |
| 102 | + } |
| 103 | + |
69 | 104 | let Ok(payload) = serde_json::to_vec(&span) else {
|
70 | 105 | track_invalid(managed_envelope, DiscardReason::Internal);
|
71 | 106 | continue;
|
@@ -119,3 +154,140 @@ pub fn extract_transaction_span(
|
119 | 154 |
|
120 | 155 | spans.into_iter().next().and_then(Annotated::into_value)
|
121 | 156 | }
|
| 157 | + |
| 158 | +#[cfg(test)] |
| 159 | +mod tests { |
| 160 | + use std::collections::BTreeMap; |
| 161 | + |
| 162 | + use super::*; |
| 163 | + use crate::services::processor::ProcessingGroup; |
| 164 | + use crate::utils::{ManagedEnvelope, TypedEnvelope}; |
| 165 | + use crate::Envelope; |
| 166 | + use bytes::Bytes; |
| 167 | + use relay_spans::otel_trace::Span as OtelSpan; |
| 168 | + use relay_system::Addr; |
| 169 | + |
| 170 | + #[test] |
| 171 | + fn attribute_denormalization() { |
| 172 | + // Construct an OTLP trace payload with: |
| 173 | + // - a resource with one attribute, containing: |
| 174 | + // - an instrumentation scope with one attribute, containing: |
| 175 | + // - a span with one attribute |
| 176 | + let traces_data = r#" |
| 177 | + { |
| 178 | + "resourceSpans": [ |
| 179 | + { |
| 180 | + "resource": { |
| 181 | + "attributes": [ |
| 182 | + { |
| 183 | + "key": "resource_key", |
| 184 | + "value": { |
| 185 | + "stringValue": "resource_value" |
| 186 | + } |
| 187 | + } |
| 188 | + ] |
| 189 | + }, |
| 190 | + "scopeSpans": [ |
| 191 | + { |
| 192 | + "scope": { |
| 193 | + "name": "test_instrumentation", |
| 194 | + "version": "0.0.1", |
| 195 | + "attributes": [ |
| 196 | + { |
| 197 | + "key": "scope_key", |
| 198 | + "value": { |
| 199 | + "stringValue": "scope_value" |
| 200 | + } |
| 201 | + } |
| 202 | + ] |
| 203 | + }, |
| 204 | + "spans": [ |
| 205 | + { |
| 206 | + "attributes": [ |
| 207 | + { |
| 208 | + "key": "span_key", |
| 209 | + "value": { |
| 210 | + "stringValue": "span_value" |
| 211 | + } |
| 212 | + } |
| 213 | + ] |
| 214 | + } |
| 215 | + ] |
| 216 | + } |
| 217 | + ] |
| 218 | + } |
| 219 | + ] |
| 220 | + } |
| 221 | + "#; |
| 222 | + |
| 223 | + // Build an envelope containing the OTLP trace data. |
| 224 | + let bytes = |
| 225 | + Bytes::from(r#"{"dsn":"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42"}"#); |
| 226 | + let envelope = Envelope::parse_bytes(bytes).unwrap(); |
| 227 | + let (test_store, _) = Addr::custom(); |
| 228 | + let (outcome_aggregator, _) = Addr::custom(); |
| 229 | + let managed_envelope = ManagedEnvelope::new( |
| 230 | + envelope, |
| 231 | + outcome_aggregator, |
| 232 | + test_store, |
| 233 | + ProcessingGroup::Span, |
| 234 | + ); |
| 235 | + let mut typed_envelope: TypedEnvelope<_> = managed_envelope.try_into().unwrap(); |
| 236 | + let mut item = Item::new(ItemType::OtelTracesData); |
| 237 | + item.set_payload(ContentType::Json, traces_data); |
| 238 | + typed_envelope.envelope_mut().add_item(item.clone()); |
| 239 | + |
| 240 | + // Convert the OTLP trace data into `OtelSpan` item(s). |
| 241 | + convert_traces_data(item, &mut typed_envelope); |
| 242 | + |
| 243 | + // Assert that the attributes from the resource and instrumentation |
| 244 | + // scope were copied. |
| 245 | + let item = typed_envelope |
| 246 | + .envelope() |
| 247 | + .items() |
| 248 | + .find(|i| *i.ty() == ItemType::OtelSpan) |
| 249 | + .expect("converted span missing from envelope"); |
| 250 | + let attributes = serde_json::from_slice::<OtelSpan>(&item.payload()) |
| 251 | + .expect("unable to deserialize otel span") |
| 252 | + .attributes |
| 253 | + .into_iter() |
| 254 | + .map(|kv| (kv.key, kv.value.unwrap())) |
| 255 | + .collect::<BTreeMap<_, _>>(); |
| 256 | + let attribute_value = |key: &str| -> String { |
| 257 | + match attributes |
| 258 | + .get(key) |
| 259 | + .unwrap_or_else(|| panic!("attribute {} missing", key)) |
| 260 | + .to_owned() |
| 261 | + .value |
| 262 | + { |
| 263 | + Some(Value::StringValue(str)) => str, |
| 264 | + _ => panic!("attribute {} not a string", key), |
| 265 | + } |
| 266 | + }; |
| 267 | + assert_eq!( |
| 268 | + attribute_value("span_key"), |
| 269 | + "span_value".to_owned(), |
| 270 | + "original span attribute should be present" |
| 271 | + ); |
| 272 | + assert_eq!( |
| 273 | + attribute_value("instrumentation.name"), |
| 274 | + "test_instrumentation".to_owned(), |
| 275 | + "instrumentation name should be in attributes" |
| 276 | + ); |
| 277 | + assert_eq!( |
| 278 | + attribute_value("instrumentation.version"), |
| 279 | + "0.0.1".to_owned(), |
| 280 | + "instrumentation version should be in attributes" |
| 281 | + ); |
| 282 | + assert_eq!( |
| 283 | + attribute_value("resource.resource_key"), |
| 284 | + "resource_value".to_owned(), |
| 285 | + "resource attribute should be copied with prefix" |
| 286 | + ); |
| 287 | + assert_eq!( |
| 288 | + attribute_value("instrumentation.scope_key"), |
| 289 | + "scope_value".to_owned(), |
| 290 | + "instruementation scope attribute should be copied with prefix" |
| 291 | + ); |
| 292 | + } |
| 293 | +} |
0 commit comments