Skip to content

Commit afd727c

Browse files
committed
initial commit for llo handler
1 parent 15f625e commit afd727c

File tree

2 files changed

+263
-2
lines changed

2 files changed

+263
-2
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33

4-
import os
5-
64
from typing import Dict, Optional, Sequence
75

86
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
7+
from amazon.opentelemetry.distro.llo_handler import LLOHandler
98
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
109
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
1110
from opentelemetry.exporter.otlp.proto.http import Compression
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
import logging
2+
import re
3+
4+
from typing import Any, Dict, List, Sequence
5+
6+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
7+
8+
from opentelemetry.attributes import BoundedAttributes
9+
from opentelemetry._events import Event
10+
from opentelemetry.sdk._logs import LoggerProvider
11+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
12+
from opentelemetry.sdk._events import EventLoggerProvider
13+
from opentelemetry.sdk.trace import ReadableSpan
14+
15+
_logger = logging.getLogger(__name__)
16+
17+
class LLOHandler:
18+
"""
19+
Utility class for handling Large Language Objects (LLO).
20+
This class identifies LLO attributes, emits them as log records, and filters
21+
them out from telemetry data.
22+
"""
23+
def __init__(self, logs_exporter: OTLPAwsLogExporter):
24+
self._logs_exporter = logs_exporter
25+
self._logger_provider = LoggerProvider()
26+
self._logger_provider.add_log_record_processor(
27+
BatchLogRecordProcessor(self._logs_exporter)
28+
)
29+
30+
self._event_logger_provider = EventLoggerProvider(logger_provider=self._logger_provider)
31+
self._event_logger = self._event_logger_provider.get_event_logger("gen_ai.events")
32+
33+
self._exact_match_patterns = []
34+
self._regex_match_patterns = [
35+
r"^gen_ai\.prompt\.\d+\.content$"
36+
]
37+
38+
39+
def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]:
40+
"""
41+
Perform LLO processing for each span:
42+
1. Emitting LLO attributes as Gen AI Events
43+
2. Filtering out LLO attributes from the span
44+
"""
45+
modified_spans = []
46+
47+
for span in spans:
48+
self._emit_llo_attributes(span, span.attributes)
49+
updated_attributes = self._filter_attributes(span.attributes)
50+
51+
if isinstance(span.attributes, BoundedAttributes):
52+
span._attributes = BoundedAttributes(
53+
maxlen=span.attributes.maxlen,
54+
attributes=updated_attributes,
55+
immutable=span.attributes._immutable,
56+
max_value_len=span.attributes.max_value_len
57+
)
58+
else:
59+
span._attributes = updated_attributes
60+
61+
modified_spans.append(span)
62+
63+
return modified_spans
64+
65+
66+
def _emit_llo_attributes(self, span: ReadableSpan, attributes: Dict[str, Any]) -> None:
67+
"""
68+
Extract, transform, and emit LLO attributes as Gen AI Events
69+
"""
70+
all_events = []
71+
all_events.extend(self._extract_gen_ai_prompt_events(span, attributes))
72+
73+
for event in all_events:
74+
self._event_logger.emit(event)
75+
_logger.debug(f"Emitted Gen AI Event: {event.name}")
76+
77+
78+
def _filter_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]:
79+
"""
80+
Filter out attributes that contain LLO from the span's attributes.
81+
"""
82+
filtered_attributes = {}
83+
84+
for key, value in attributes.items():
85+
if not self._is_llo_attribute(key):
86+
filtered_attributes[key] = value
87+
88+
return filtered_attributes
89+
90+
91+
def _is_llo_attribute(self, key: str) -> bool:
92+
"""
93+
Determine if a span attribute contains an LLO based on its key.
94+
"""
95+
return (
96+
any(pattern == key for pattern in self._exact_match_patterns) or
97+
any(re.match(pattern, key) for pattern in self._regex_match_patterns)
98+
)
99+
100+
101+
def _extract_gen_ai_prompt_events(self, span: ReadableSpan, attributes: Dict[str, Any]) -> List[Event]:
102+
"""
103+
Extract gen_ai prompt events from attributes. Each item `gen_ai.prompt.{n}.content`
104+
maps has an associated `gen_ai.prompt.{n}.role` that we map to an Event type.
105+
106+
`gen_ai.prompt.{n}.role`:
107+
1. `system` -> `gen_ai.system.message` Event
108+
2. `user` -> `gen_ai.user.message` Event
109+
3. `assistant` -> `gen_ai.assistant.message` Event
110+
4. `function` -> `gen_ai.tool.message` Event
111+
5. `unknown` -> `gen_ai.tool.message` Event - This is a best attempt mapping
112+
"""
113+
events = []
114+
span_ctx = span.context
115+
gen_ai_system = span.attributes.get("gen_ai.system", "unknown")
116+
117+
prompt_timestamp = span.start_time
118+
prompt_content_pattern = re.compile(r"^gen_ai\.prompt\.(\d+)\.content$")
119+
120+
for key, value in attributes.items():
121+
match = prompt_content_pattern.match(key)
122+
if not match:
123+
continue
124+
125+
index = match.group(1)
126+
role_key = f"gen_ai.prompt.{index}.role"
127+
role = attributes.get(role_key, "unknown")
128+
129+
event_attributes = {
130+
"gen_ai.system": gen_ai_system,
131+
"original_attribute": key
132+
}
133+
134+
event = None
135+
if role == "system":
136+
event = self._get_gen_ai_system_message_event(
137+
span_ctx,
138+
prompt_timestamp,
139+
event_attributes,
140+
value,
141+
role
142+
)
143+
elif role == "user":
144+
event = self._get_gen_ai_user_message_event(
145+
span_ctx,
146+
prompt_timestamp,
147+
event_attributes,
148+
value,
149+
role
150+
)
151+
elif role == "assistant":
152+
event = self._get_gen_ai_assistant_message_event(
153+
span_ctx,
154+
prompt_timestamp,
155+
event_attributes,
156+
value,
157+
role,
158+
)
159+
elif role in ["function", "unknown"]:
160+
# TODO: Need to define a custom event and emit
161+
pass
162+
163+
if event:
164+
events.append(event)
165+
166+
return events
167+
168+
def _get_gen_ai_system_message_event(
169+
self,
170+
span_ctx,
171+
timestamp,
172+
event_attributes,
173+
content,
174+
role
175+
):
176+
"""
177+
Create and return a `gen_ai.system.message` Event.
178+
"""
179+
body = {"content": content}
180+
181+
# According to OTel spec, this body field is only required if available and not equal to `system`.
182+
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/#event-gen_aisystemmessage
183+
if role != "system":
184+
body["role"] = role
185+
186+
return Event(
187+
name="gen_ai.system.message",
188+
timestamp=timestamp,
189+
attributes=event_attributes,
190+
body=body,
191+
trace_id=span_ctx.trace_id,
192+
span_id=span_ctx.span_id,
193+
trace_flags=span_ctx.trace_flags,
194+
)
195+
196+
def _get_gen_ai_user_message_event(
197+
self,
198+
span_ctx,
199+
timestamp,
200+
event_attributes,
201+
content,
202+
role
203+
):
204+
"""
205+
Create and return a `gen_ai.user.message` Event.
206+
"""
207+
body = {"content": content}
208+
209+
# According to OTel spec, this body field is only required if available and not equal to `user`.
210+
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/#event-gen_aiusermessage
211+
if role != "user":
212+
body["role"] = role
213+
214+
return Event(
215+
name="gen_ai.user.message",
216+
timestamp=timestamp,
217+
attributes=event_attributes,
218+
body=body,
219+
trace_id=span_ctx.trace_id,
220+
span_id=span_ctx.span_id,
221+
trace_flags=span_ctx.trace_flags,
222+
)
223+
224+
def _get_gen_ai_assistant_message_event(
225+
self,
226+
span_ctx,
227+
timestamp,
228+
event_attributes,
229+
content,
230+
role,
231+
):
232+
"""
233+
Create and return a `gen_ai.assistant.message` Event.
234+
235+
According to the OTel spec, assistant message events may contain tool_calls,
236+
if available. In our implementation, tool call information is not available
237+
directly in the span attributes we're processing - it exists in separate
238+
related spans.
239+
240+
Thus without implementing complex span correlation, we cannot reliable extract
241+
tool_calls for assistant messages. This limitation is acceptable per the OTel
242+
spec since tool_calls are only required when available. However, this will
243+
lead to reduction in data quality.
244+
245+
ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/#event-gen_aiassistantmessage
246+
"""
247+
body = {"content": content}
248+
249+
# According to the OTel spec, this body field is only required if available and not equal to `assistant`.
250+
# ref: https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-events/#event-gen_aiassistantmessage
251+
if role != "assistant":
252+
body["role"] = role
253+
254+
return Event(
255+
name="gen_ai.assistant.message",
256+
timestamp=timestamp,
257+
attributes=event_attributes,
258+
body=body,
259+
trace_id=span_ctx.trace_id,
260+
span_id=span_ctx.span_id,
261+
trace_flags=span_ctx.trace_flags,
262+
)

0 commit comments

Comments
 (0)