Skip to content

Commit 6eb06fe

Browse files
authored
feat: Add GenAI agent trace enrichment to otel_traces processor (opensearch-project#6548)
* feat: add GenAI agent trace enrichment to otel_traces processor Always-on enrichment in otel_traces processor: - Normalizes vendor attributes (OpenInference, OpenLLMetry) to gen_ai.* semconv - Propagates select gen_ai attributes from child spans to root - Aggregates token counts across children to root - Strips conflicting flattened sub-keys No configuration required. No-op for non-GenAI traces. RFC: opensearch-project#6542 Signed-off-by: Kyle Hounslow <kylhouns@amazon.com> * fix: use OTelProtoOpensearchCodec storage key format in GenAI enrichment OTelProtoOpensearchCodec converts span attribute keys from dot-notation to a prefixed @ format before storing them in the JacksonSpan attributes map (e.g. gen_ai.system -> span.attributes.gen_ai@system). The enrichment code was using dot-notation for lookups and writes, so it silently found nothing in production even though unit tests passed (tests bypass the codec). Fix: add toStorageKey()/toLogicalKey() helpers that convert between the two formats. All attribute reads and writes in enrichRootSpan, normalizeAttributes, and stripFlattenedSubkeys now use the storage format. Test fix: add convertToStorageFormat() helper that renames attribute keys to simulate the codec, and storageFormatRecords() that applies it before passing spans to the processor. JSON fixtures stay in dot-notation. All GenAI tests now exercise the real code path. E2E validated: LangGraph, Strands, CrewAI root spans now have gen_ai.* attributes propagated correctly. Signed-off-by: Kyle Hounslow <kylhouns@amazon.com> * refactor: address PR review comments on GenAiAttributeMappings and tests - Make MappingTarget fields private with getKey()/isWrapSlice() getters - Make LOOKUP_TABLE/OPERATION_NAME_VALUES private with static getters - Add GenAiAttributeMappingsTest with direct coverage of getters and mappings - Assert result/attrs non-empty in testFlattenedSubkeysStripped to prevent silent pass when collections are empty Signed-off-by: Kyle Hounslow <kylhouns@amazon.com> * test: add integration test verifying GenAI enrichment runs in OTelTraceRawProcessor.doExecute Adds testGenAiEnrichmentRunsDuringDoExecute to OTelTraceRawProcessorTest. Passes a span with OpenLLMetry vendor attributes through doExecute and asserts the normalized gen_ai.* attribute appears on the output span, verifying the enrichment call is wired into the processor pipeline. Signed-off-by: Kyle Hounslow <kylhouns@amazon.com> * refactor: load GenAI attribute mappings from YAML resource file Moves hardcoded attribute mappings from GenAiAttributeMappings.java into genai-attribute-mappings.yaml in the jar resources. Loaded at class init via Jackson YAML. This separates data from code and makes it easier to add new instrumentation library mappings without modifying Java. Covers OpenInference (15 mappings) and OpenLLMetry (20 mappings) profiles plus operation_name_values. Adds testMappingsFileExists to verify the resource file is present and readable. Signed-off-by: Kyle Hounslow <kylhouns@amazon.com> * rename: wrapSlice -> wrapAsArray for clarity in GenAI attribute mappings Renamed the flag that wraps a scalar string value into a single-element JSON array from wrapSlice to wrapAsArray (Java) and wrap_as_array (YAML). The new name makes the behavior immediately clear without needing context. Signed-off-by: Kyle Hounslow <kylhouns@amazon.com> * revert: remove storage-key format from GenAI enrichment Reverts the behavioral changes from 5ac188a which converted attribute lookups and writes to use span.attributes.* prefix with @ separators (e.g. span.attributes.gen_ai@system). The enrichment code now uses plain dot-notation keys (e.g. gen_ai.system) matching the format in the JacksonSpan attributes map at processing time. Removed: toStorageKey(), toLogicalKey(), STORAGE_PREFIX, and the convertToStorageFormat()/storageFormatRecords() test helpers. Preserves accessor refactors (getKey(), isWrapAsArray(), getLookupTable()) and test assertions from subsequent commits. Unit tests: 35/35 pass (full otel-trace-raw-processor module) E2E: Strands + LangGraph agents → local DP → OpenSearch verified Signed-off-by: Kyle Hounslow <kylhouns@amazon.com> * fix: add license header to genai-attribute-mappings.yaml Signed-off-by: Kyle Hounslow <kylhouns@amazon.com> --------- Signed-off-by: Kyle Hounslow <kylhouns@amazon.com>
1 parent fa41484 commit 6eb06fe

22 files changed

+1133
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.oteltrace;
12+
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.util.Collections;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
25+
/**
26+
* Attribute mappings from vendor-specific instrumentation libraries to OTel GenAI Semantic Conventions v1.39.0.
27+
* Mappings are loaded from {@code genai-attribute-mappings.yaml} in the jar resources.
28+
*
29+
* @see <a href="https://opentelemetry.io/docs/specs/semconv/gen-ai/">OTel GenAI Semantic Conventions</a>
30+
*/
31+
final class GenAiAttributeMappings {
32+
33+
private static final Logger LOG = LoggerFactory.getLogger(GenAiAttributeMappings.class);
34+
static final String MAPPINGS_FILE = "genai-attribute-mappings.yaml";
35+
36+
static final class MappingTarget {
37+
private final String key;
38+
private final boolean wrapAsArray;
39+
40+
MappingTarget(final String key, final boolean wrapAsArray) {
41+
this.key = key;
42+
this.wrapAsArray = wrapAsArray;
43+
}
44+
45+
String getKey() {
46+
return key;
47+
}
48+
49+
boolean isWrapAsArray() {
50+
return wrapAsArray;
51+
}
52+
}
53+
54+
private static final Map<String, MappingTarget> LOOKUP_TABLE;
55+
private static final Map<String, String> OPERATION_NAME_VALUES;
56+
57+
static {
58+
Map<String, MappingTarget> lookupTable = Collections.emptyMap();
59+
Map<String, String> operationNameValues = Collections.emptyMap();
60+
try (final InputStream is = GenAiAttributeMappings.class.getClassLoader().getResourceAsStream(MAPPINGS_FILE)) {
61+
if (is == null) {
62+
LOG.error("GenAI attribute mappings file not found: {}", MAPPINGS_FILE);
63+
} else {
64+
final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
65+
@SuppressWarnings("unchecked")
66+
final Map<String, Object> yaml = mapper.readValue(is, Map.class);
67+
lookupTable = buildLookupTable(yaml);
68+
operationNameValues = buildOperationNameValues(yaml);
69+
}
70+
} catch (final IOException e) {
71+
LOG.error("Failed to load GenAI attribute mappings from {}", MAPPINGS_FILE, e);
72+
}
73+
LOOKUP_TABLE = Collections.unmodifiableMap(lookupTable);
74+
OPERATION_NAME_VALUES = Collections.unmodifiableMap(operationNameValues);
75+
}
76+
77+
private GenAiAttributeMappings() {}
78+
79+
/** Combined lookup table for all profiles. */
80+
static Map<String, MappingTarget> getLookupTable() {
81+
return LOOKUP_TABLE;
82+
}
83+
84+
/** Value mappings for gen_ai.operation.name (case-insensitive keys). */
85+
static Map<String, String> getOperationNameValues() {
86+
return OPERATION_NAME_VALUES;
87+
}
88+
89+
@SuppressWarnings("unchecked")
90+
private static Map<String, MappingTarget> buildLookupTable(final Map<String, Object> yaml) {
91+
final Map<String, MappingTarget> table = new HashMap<>();
92+
for (final Map.Entry<String, Object> profile : yaml.entrySet()) {
93+
if (!(profile.getValue() instanceof List)) {
94+
continue;
95+
}
96+
for (final Object entry : (List<?>) profile.getValue()) {
97+
if (!(entry instanceof Map)) {
98+
continue;
99+
}
100+
final Map<String, Object> mapping = (Map<String, Object>) entry;
101+
final String from = (String) mapping.get("from");
102+
final String to = (String) mapping.get("to");
103+
if (from == null || to == null) {
104+
continue;
105+
}
106+
final boolean wrapSlice = Boolean.TRUE.equals(mapping.get("wrap_as_array"));
107+
table.putIfAbsent(from, new MappingTarget(to, wrapSlice));
108+
}
109+
}
110+
return table;
111+
}
112+
113+
@SuppressWarnings("unchecked")
114+
private static Map<String, String> buildOperationNameValues(final Map<String, Object> yaml) {
115+
final Object raw = yaml.get("operation_name_values");
116+
if (!(raw instanceof Map)) {
117+
return Collections.emptyMap();
118+
}
119+
final Map<String, String> values = new HashMap<>();
120+
for (final Map.Entry<String, Object> entry : ((Map<String, Object>) raw).entrySet()) {
121+
values.put(entry.getKey().toLowerCase(), (String) entry.getValue());
122+
}
123+
return values;
124+
}
125+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.oteltrace;
12+
13+
import org.opensearch.dataprepper.model.trace.Span;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
16+
17+
import java.util.ArrayList;
18+
import java.util.Collection;
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
/**
24+
* Enriches GenAI agent traces by propagating attributes from child spans to root
25+
* and stripping conflicting flattened sub-keys.
26+
*
27+
* @see <a href="https://opentelemetry.io/docs/specs/semconv/gen-ai/">OTel GenAI Semantic Conventions v1.39.0</a>
28+
*/
29+
final class GenAiEnrichmentHelper {
30+
private static final Logger LOG = LoggerFactory.getLogger(GenAiEnrichmentHelper.class);
31+
32+
private static final String GEN_AI_SYSTEM_KEY = "gen_ai.system";
33+
private static final String GEN_AI_PROVIDER_NAME_KEY = "gen_ai.provider.name";
34+
private static final String GEN_AI_AGENT_NAME_KEY = "gen_ai.agent.name";
35+
private static final String GEN_AI_REQUEST_MODEL_KEY = "gen_ai.request.model";
36+
private static final String GEN_AI_INPUT_TOKENS_KEY = "gen_ai.usage.input_tokens";
37+
private static final String GEN_AI_OUTPUT_TOKENS_KEY = "gen_ai.usage.output_tokens";
38+
private static final String ATTRIBUTES_PREFIX = "attributes/";
39+
40+
private static final String[] PROPAGATED_STRING_KEYS = {
41+
GEN_AI_SYSTEM_KEY, GEN_AI_PROVIDER_NAME_KEY, GEN_AI_AGENT_NAME_KEY, GEN_AI_REQUEST_MODEL_KEY
42+
};
43+
44+
private static final String[] FLATTENED_PARENT_KEYS = {
45+
"llm.input_messages", "llm.output_messages",
46+
"gen_ai.prompt", "gen_ai.completion"
47+
};
48+
49+
private GenAiEnrichmentHelper() {}
50+
51+
/**
52+
* Enriches a batch of spans: normalizes vendor attributes, strips flattened sub-keys,
53+
* then propagates select gen_ai attributes from children to root spans grouped by traceId.
54+
*/
55+
static void enrichBatch(final List<Span> spans) {
56+
for (final Span span : spans) {
57+
normalizeAttributes(span);
58+
stripFlattenedSubkeys(span);
59+
}
60+
61+
final Map<String, List<Span>> spansByTrace = new HashMap<>();
62+
for (final Span span : spans) {
63+
spansByTrace.computeIfAbsent(span.getTraceId(), k -> new ArrayList<>()).add(span);
64+
}
65+
66+
for (final List<Span> traceSpans : spansByTrace.values()) {
67+
Span rootSpan = null;
68+
final List<Span> children = new ArrayList<>();
69+
for (final Span span : traceSpans) {
70+
if (isRootSpan(span)) {
71+
rootSpan = span;
72+
} else {
73+
children.add(span);
74+
}
75+
}
76+
if (rootSpan != null && !children.isEmpty()) {
77+
enrichRootSpan(rootSpan, children);
78+
}
79+
}
80+
}
81+
82+
private static boolean isRootSpan(final Span span) {
83+
final String parentSpanId = span.getParentSpanId();
84+
return parentSpanId == null || parentSpanId.isEmpty()
85+
|| "0000000000000000".equals(parentSpanId);
86+
}
87+
88+
/**
89+
* Propagates gen_ai.* string attributes and aggregated token counts from children to root.
90+
* Skips if root already has the attributes.
91+
*/
92+
static void enrichRootSpan(final Span rootSpan, final Collection<Span> children) {
93+
final Map<String, Object> rootAttrs = rootSpan.getAttributes();
94+
95+
final Map<String, String> toPropagate = new HashMap<>();
96+
for (final String key : PROPAGATED_STRING_KEYS) {
97+
if (rootAttrs == null || !rootAttrs.containsKey(key)) {
98+
toPropagate.put(key, null);
99+
}
100+
}
101+
final boolean rootHasTokens = rootAttrs != null && rootAttrs.containsKey(GEN_AI_INPUT_TOKENS_KEY);
102+
103+
if (toPropagate.isEmpty() && rootHasTokens) {
104+
return;
105+
}
106+
107+
long totalInputTokens = 0;
108+
long totalOutputTokens = 0;
109+
boolean foundTokens = false;
110+
111+
for (final Span child : children) {
112+
final Map<String, Object> attrs = child.getAttributes();
113+
if (attrs == null) {
114+
continue;
115+
}
116+
117+
for (final Map.Entry<String, String> entry : toPropagate.entrySet()) {
118+
if (entry.getValue() == null && attrs.containsKey(entry.getKey())) {
119+
entry.setValue((String) attrs.get(entry.getKey()));
120+
}
121+
}
122+
123+
final Number inputTokens = (Number) attrs.get(GEN_AI_INPUT_TOKENS_KEY);
124+
final Number outputTokens = (Number) attrs.get(GEN_AI_OUTPUT_TOKENS_KEY);
125+
if (inputTokens != null || outputTokens != null) {
126+
foundTokens = true;
127+
if (inputTokens != null) {
128+
totalInputTokens += inputTokens.longValue();
129+
}
130+
if (outputTokens != null) {
131+
totalOutputTokens += outputTokens.longValue();
132+
}
133+
}
134+
}
135+
136+
for (final Map.Entry<String, String> entry : toPropagate.entrySet()) {
137+
if (entry.getValue() != null) {
138+
rootSpan.put(ATTRIBUTES_PREFIX + entry.getKey(), entry.getValue());
139+
LOG.debug("Propagated {} = {} to root span {}", entry.getKey(), entry.getValue(), rootSpan.getSpanId());
140+
}
141+
}
142+
143+
if (!rootHasTokens && foundTokens) {
144+
rootSpan.put(ATTRIBUTES_PREFIX + GEN_AI_INPUT_TOKENS_KEY, totalInputTokens);
145+
rootSpan.put(ATTRIBUTES_PREFIX + GEN_AI_OUTPUT_TOKENS_KEY, totalOutputTokens);
146+
LOG.debug("Aggregated tokens (input={}, output={}) to root span {}", totalInputTokens, totalOutputTokens, rootSpan.getSpanId());
147+
}
148+
}
149+
150+
/**
151+
* Normalizes vendor-specific attributes to OTel GenAI Semantic Conventions.
152+
* Copies values to the standard gen_ai.* key, keeping originals intact.
153+
* Skips if the target attribute already exists on the span.
154+
*/
155+
static void normalizeAttributes(final Span span) {
156+
final Map<String, Object> attrs = span.getAttributes();
157+
if (attrs == null) {
158+
return;
159+
}
160+
161+
for (final Map.Entry<String, Object> entry : new ArrayList<>(attrs.entrySet())) {
162+
final GenAiAttributeMappings.MappingTarget target =
163+
GenAiAttributeMappings.getLookupTable().get(entry.getKey());
164+
if (target == null) {
165+
continue;
166+
}
167+
if (attrs.containsKey(target.getKey())) {
168+
continue;
169+
}
170+
171+
Object value = entry.getValue();
172+
if (value instanceof String) {
173+
final String strVal = (String) value;
174+
if ("gen_ai.operation.name".equals(target.getKey())) {
175+
final String mapped = GenAiAttributeMappings.getOperationNameValues()
176+
.get(strVal.toLowerCase());
177+
if (mapped != null) {
178+
value = mapped;
179+
}
180+
}
181+
if (target.isWrapAsArray()) {
182+
value = "[\"" + value + "\"]";
183+
}
184+
}
185+
186+
span.put(ATTRIBUTES_PREFIX + target.getKey(), value);
187+
}
188+
}
189+
190+
/**
191+
* Removes flattened sub-keys (e.g. "llm.input_messages.0.message.content") that
192+
* conflict with parent string values (e.g. "llm.input_messages"), preventing OpenSearch mapping failures.
193+
* Only strips when the parent string value exists. If only sub-keys exist, they are preserved.
194+
*/
195+
static void stripFlattenedSubkeys(final Span span) {
196+
final Map<String, Object> attrs = span.getAttributes();
197+
if (attrs == null) {
198+
return;
199+
}
200+
201+
final List<String> toRemove = new ArrayList<>();
202+
for (final String parentKey : FLATTENED_PARENT_KEYS) {
203+
if (!attrs.containsKey(parentKey)) {
204+
continue;
205+
}
206+
for (final String key : attrs.keySet()) {
207+
if (key.startsWith(parentKey + ".") && key.length() > parentKey.length() + 1
208+
&& Character.isDigit(key.charAt(parentKey.length() + 1))) {
209+
toRemove.add(key);
210+
}
211+
}
212+
}
213+
214+
for (final String key : toRemove) {
215+
try {
216+
span.delete(ATTRIBUTES_PREFIX + key);
217+
} catch (final Exception e) {
218+
LOG.warn("Failed to delete flattened sub-key {}: {}", key, e.getMessage());
219+
}
220+
}
221+
}
222+
}

data-prepper-plugins/otel-trace-raw-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/oteltrace/OTelTraceRawProcessor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ public Collection<Record<Span>> doExecute(Collection<Record<Span>> records) {
9999

100100
processedSpans.addAll(getTracesToFlushByGarbageCollection());
101101

102+
// Enrich GenAI agent traces (propagate select gen_ai attributes to root, aggregate tokens, strip conflicting sub-keys)
103+
GenAiEnrichmentHelper.enrichBatch(processedSpans);
104+
102105
// Derive server span attributes (fault, error, operation, environment)
103106
OTelSpanDerivationUtil.deriveServerSpanAttributes(processedSpans);
104107

0 commit comments

Comments
 (0)