12
12
GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message"
13
13
GEN_AI_USER_MESSAGE = "gen_ai.user.message"
14
14
GEN_AI_ASSISTANT_MESSAGE = "gen_ai.assistant.message"
15
+ TRACELOOP_ENTITY_INPUT = "traceloop.entity.input"
16
+ TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output"
15
17
16
18
_logger = logging .getLogger (__name__ )
17
19
20
+
18
21
class LLOHandler :
19
22
"""
20
23
Utility class for handling Large Language Objects (LLO) in OpenTelemetry spans.
@@ -24,6 +27,7 @@ class LLOHandler:
24
27
2. Extracts and transforms these attributes into OpenTelemetry Gen AI Events
25
28
3. Filters LLO from spans
26
29
"""
30
+
27
31
def __init__ (self , logger_provider : LoggerProvider ):
28
32
"""
29
33
Initialize an LLOHandler with the specified logger provider.
@@ -37,12 +41,15 @@ def __init__(self, logger_provider: LoggerProvider):
37
41
self ._event_logger_provider = EventLoggerProvider (logger_provider = self ._logger_provider )
38
42
self ._event_logger = self ._event_logger_provider .get_event_logger ("gen_ai.events" )
39
43
40
- self ._exact_match_patterns = []
44
+ self ._exact_match_patterns = [
45
+ "traceloop.entity.input" ,
46
+ "traceloop.entity.output" ,
47
+ ]
41
48
self ._regex_match_patterns = [
42
- r"^gen_ai\.prompt\.\d+\.content$"
49
+ r"^gen_ai\.prompt\.\d+\.content$" ,
50
+ r"^gen_ai\.completion\.\d+\.content$" ,
43
51
]
44
52
45
-
46
53
def process_spans (self , spans : Sequence [ReadableSpan ]) -> List [ReadableSpan ]:
47
54
"""
48
55
Performs LLO processing for each span:
@@ -66,7 +73,7 @@ def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]:
66
73
maxlen = span .attributes .maxlen ,
67
74
attributes = updated_attributes ,
68
75
immutable = span .attributes ._immutable ,
69
- max_value_len = span .attributes .max_value_len
76
+ max_value_len = span .attributes .max_value_len ,
70
77
)
71
78
else :
72
79
span ._attributes = updated_attributes
@@ -75,7 +82,6 @@ def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]:
75
82
76
83
return modified_spans
77
84
78
-
79
85
def _emit_llo_attributes (self , span : ReadableSpan , attributes : Dict [str , Any ]) -> None :
80
86
"""
81
87
Collects the Gen AI Events for each LLO attribute in the span and emits them
@@ -90,12 +96,13 @@ def _emit_llo_attributes(self, span: ReadableSpan, attributes: Dict[str, Any]) -
90
96
"""
91
97
all_events = []
92
98
all_events .extend (self ._extract_gen_ai_prompt_events (span , attributes ))
99
+ all_events .extend (self ._extract_gen_ai_completion_events (span , attributes ))
100
+ all_events .extend (self ._extract_traceloop_events (span , attributes ))
93
101
94
102
for event in all_events :
95
103
self ._event_logger .emit (event )
96
104
_logger .debug (f"Emitted Gen AI Event: { event .name } " )
97
105
98
-
99
106
def _filter_attributes (self , attributes : Dict [str , Any ]) -> Dict [str , Any ]:
100
107
"""
101
108
Filter out attributes that contain LLO from the span's attributes. This
@@ -116,7 +123,6 @@ def _filter_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]:
116
123
117
124
return filtered_attributes
118
125
119
-
120
126
def _is_llo_attribute (self , key : str ) -> bool :
121
127
"""
122
128
Determine if a span attribute contains an LLO based on its key name.
@@ -131,12 +137,10 @@ def _is_llo_attribute(self, key: str) -> bool:
131
137
Returns:
132
138
bool: True if the key matches an LLO pattern, False otherwise
133
139
"""
134
- return (
135
- any (pattern == key for pattern in self ._exact_match_patterns ) or
136
- any (re .match (pattern , key ) for pattern in self ._regex_match_patterns )
140
+ return any (pattern == key for pattern in self ._exact_match_patterns ) or any (
141
+ re .match (pattern , key ) for pattern in self ._regex_match_patterns
137
142
)
138
143
139
-
140
144
def _extract_gen_ai_prompt_events (self , span : ReadableSpan , attributes : Dict [str , Any ]) -> List [Event ]:
141
145
"""
142
146
Extract gen_ai prompt events from attributes. Each item `gen_ai.prompt.{n}.content`
@@ -173,15 +177,9 @@ def _extract_gen_ai_prompt_events(self, span: ReadableSpan, attributes: Dict[str
173
177
role_key = f"gen_ai.prompt.{ index } .role"
174
178
role = attributes .get (role_key , "unknown" )
175
179
176
- event_attributes = {
177
- "gen_ai.system" : gen_ai_system ,
178
- "original_attribute" : key
179
- }
180
+ event_attributes = {"gen_ai.system" : gen_ai_system , "original_attribute" : key }
180
181
181
- body = {
182
- "content" : value ,
183
- "role" : role
184
- }
182
+ body = {"content" : value , "role" : role }
185
183
186
184
event = None
187
185
if role == "system" :
@@ -190,46 +188,104 @@ def _extract_gen_ai_prompt_events(self, span: ReadableSpan, attributes: Dict[str
190
188
span_ctx = span_ctx ,
191
189
timestamp = prompt_timestamp ,
192
190
attributes = event_attributes ,
193
- body = body
191
+ body = body ,
194
192
)
195
193
elif role == "user" :
196
194
event = self ._get_gen_ai_event (
197
195
name = GEN_AI_USER_MESSAGE ,
198
196
span_ctx = span_ctx ,
199
197
timestamp = prompt_timestamp ,
200
198
attributes = event_attributes ,
201
- body = body
199
+ body = body ,
202
200
)
203
201
elif role == "assistant" :
204
202
event = self ._get_gen_ai_event (
205
203
name = GEN_AI_ASSISTANT_MESSAGE ,
206
204
span_ctx = span_ctx ,
207
205
timestamp = prompt_timestamp ,
208
206
attributes = event_attributes ,
209
- body = body
207
+ body = body ,
210
208
)
211
209
elif role in ["function" , "unknown" ]:
212
210
event = self ._get_gen_ai_event (
213
211
name = f"gen_ai.{ gen_ai_system } .message" ,
214
212
span_ctx = span_ctx ,
215
213
timestamp = prompt_timestamp ,
216
214
attributes = event_attributes ,
217
- body = body
215
+ body = body ,
218
216
)
219
217
220
218
if event :
221
219
events .append (event )
222
220
223
221
return events
224
222
225
- def _get_gen_ai_event (
226
- self ,
227
- name ,
228
- span_ctx ,
229
- timestamp ,
230
- attributes ,
231
- body
232
- ):
223
+ def _extract_gen_ai_completion_events (self , span : ReadableSpan , attributes : Dict [str , Any ]) -> List [Event ]:
224
+ events = []
225
+ span_ctx = span .context
226
+ gen_ai_system = span .attributes .get ("gen_ai.system" , "unknown" )
227
+
228
+ completion_timestamp = span .end_time
229
+
230
+ completion_content_pattern = re .compile (r"^gen_ai\.completion\.(\d+)\.content$" )
231
+
232
+ for key , value in attributes .items ():
233
+ match = completion_content_pattern .match (key )
234
+ if not match :
235
+ continue
236
+
237
+ index = match .group (1 )
238
+ role_key = f"gen_ai.completion.{ index } .role"
239
+ role = attributes .get (role_key , "unknown" )
240
+
241
+ event_attributes = {"gen_ai.system" : gen_ai_system , "original_attribute" : key }
242
+
243
+ body = {"content" : value , "role" : role }
244
+
245
+ event = None
246
+ if role == "assistant" :
247
+ event = self ._get_gen_ai_event (
248
+ name = GEN_AI_ASSISTANT_MESSAGE ,
249
+ span_ctx = span_ctx ,
250
+ timestamp = completion_timestamp ,
251
+ attributes = event_attributes ,
252
+ body = body ,
253
+ )
254
+ else :
255
+ event = self ._get_gen_ai_event (
256
+ name = f"gen_ai.{ gen_ai_system } .message" ,
257
+ span_ctx = span_ctx ,
258
+ timestamp = completion_timestamp ,
259
+ attributes = event_attributes ,
260
+ body = body ,
261
+ )
262
+
263
+ if event :
264
+ events .append (event )
265
+
266
+ return events
267
+
268
+ def _extract_traceloop_events (self , span : ReadableSpan , attributes : Dict [str , Any ]) -> List [Event ]:
269
+ events = []
270
+ span_ctx = span .context
271
+ gen_ai_system = span .attributes .get ("traceloop.entity.name" , "unknown" )
272
+
273
+ traceloop_attrs = [(TRACELOOP_ENTITY_INPUT , span .start_time ), (TRACELOOP_ENTITY_OUTPUT , span .end_time )]
274
+
275
+ for attr_key , timestamp in traceloop_attrs :
276
+ if attr_key in attributes :
277
+ event = self ._get_gen_ai_event (
278
+ name = f"gen_ai.{ gen_ai_system } .message" ,
279
+ span_ctx = span_ctx ,
280
+ timestamp = timestamp ,
281
+ attributes = {"gen_ai.system" : gen_ai_system , "original_attribute" : attr_key },
282
+ body = {"content" : attributes [attr_key ]},
283
+ )
284
+ events .append (event )
285
+
286
+ return events
287
+
288
+ def _get_gen_ai_event (self , name , span_ctx , timestamp , attributes , body ):
233
289
"""
234
290
Create and return a Gen AI Event with the provided parameters.
235
291
@@ -250,5 +306,5 @@ def _get_gen_ai_event(
250
306
body = body ,
251
307
trace_id = span_ctx .trace_id ,
252
308
span_id = span_ctx .span_id ,
253
- trace_flags = span_ctx .trace_flags
309
+ trace_flags = span_ctx .trace_flags ,
254
310
)
0 commit comments