Skip to content

Commit 15bf572

Browse files
committed
working wip
1 parent 1e1626a commit 15bf572

File tree

1 file changed

+33
-24
lines changed

1 file changed

+33
-24
lines changed

ddtrace/llmobs/_writer.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def __init__(
111111
) -> None:
112112
super(BaseLLMObsWriter, self).__init__(interval=interval)
113113
self._lock = forksafe.RLock()
114-
self._buffer: List[Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent]] = []
114+
self._buffer: List[str] = []
115115
self._buffer_size: int = 0
116116
self._timeout: float = timeout
117117
self._api_key: str = _api_key or config._dd_api_key
@@ -148,7 +148,7 @@ def stop(self, timeout=None):
148148
def on_shutdown(self):
149149
self.periodic()
150150

151-
def _enqueue(self, event: Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent], event_size: int) -> None:
151+
def _enqueue(self, encoded_event: str, event_size: int) -> None:
152152
"""Internal shared logic of enqueuing events to be submitted to LLM Observability."""
153153
with self._lock:
154154
if len(self._buffer) >= self.BUFFER_LIMIT:
@@ -160,7 +160,7 @@ def _enqueue(self, event: Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent], e
160160
if self._buffer_size + event_size > EVP_PAYLOAD_SIZE_LIMIT:
161161
logger.debug("manually flushing buffer because queueing next event will exceed EVP payload limit")
162162
self.periodic()
163-
self._buffer.append(event)
163+
self._buffer.append(encoded_event)
164164
self._buffer_size += event_size
165165

166166
def _encode(self, payload, num_events):
@@ -177,7 +177,7 @@ def periodic(self) -> None:
177177
with self._lock:
178178
if not self._buffer:
179179
return
180-
events = self._buffer
180+
enc_events = self._buffer
181181
self._buffer = []
182182
self._buffer_size = 0
183183

@@ -188,16 +188,13 @@ def periodic(self) -> None:
188188
"`LLMObs.enable(api_key=...)` before running your application."
189189
)
190190
return
191-
data = self._data(events)
192-
enc_llm_events = self._encode(data, len(events))
193-
if not enc_llm_events:
194-
return
191+
payload = self._data(enc_events)
195192
try:
196-
self._send_payload_with_retry(enc_llm_events, len(events))
193+
self._send_payload_with_retry(payload, len(enc_events))
197194
except Exception:
198-
telemetry.record_dropped_payload(len(events), event_type=self.EVENT_TYPE, error="connection_error")
195+
telemetry.record_dropped_payload(len(enc_events), event_type=self.EVENT_TYPE, error="connection_error")
199196
logger.error(
200-
"failed to send %d LLMObs %s events to %s", len(events), self.EVENT_TYPE, self._intake, exc_info=True
197+
"failed to send %d LLMObs %s events to %s", len(enc_events), self.EVENT_TYPE, self._intake, exc_info=True
201198
)
202199

203200
def _send_payload(self, payload: bytes, num_events: int):
@@ -263,11 +260,16 @@ class LLMObsEvalMetricWriter(BaseLLMObsWriter):
263260
ENDPOINT = EVAL_ENDPOINT
264261

265262
def enqueue(self, event: LLMObsEvaluationMetricEvent) -> None:
266-
event_size = len(safe_json(event))
267-
self._enqueue(event, event_size)
263+
encoded_event = self._encode(event, 1)
264+
if not encoded_event:
265+
return
266+
event_size = len(encoded_event)
267+
self._enqueue(encoded_event, event_size)
268268

269-
def _data(self, events: List[LLMObsEvaluationMetricEvent]) -> Dict[str, Any]:
270-
return {"data": {"type": "evaluation_metric", "attributes": {"metrics": events}}}
269+
def _data(self, events: List[str]) -> str:
270+
metrics = ",".join(events)
271+
enc_payload = f'{{"data": {{"type": "evaluation_metric", "attributes": {{"metrics": [{metrics}]}}}}}}'
272+
return enc_payload
271273

272274

273275
class LLMObsSpanWriter(BaseLLMObsWriter):
@@ -279,7 +281,10 @@ class LLMObsSpanWriter(BaseLLMObsWriter):
279281
ENDPOINT = SPAN_ENDPOINT
280282

281283
def enqueue(self, event: LLMObsSpanEvent) -> None:
282-
raw_event_size = len(safe_json(event))
284+
encoded_event = self._encode(event, 1)
285+
if not encoded_event:
286+
return
287+
raw_event_size = len(encoded_event)
283288
truncated_event_size = None
284289
should_truncate = raw_event_size >= EVP_EVENT_SIZE_LIMIT
285290
if should_truncate:
@@ -288,16 +293,20 @@ def enqueue(self, event: LLMObsSpanEvent) -> None:
288293
raw_event_size,
289294
)
290295
event = _truncate_span_event(event)
291-
truncated_event_size = len(safe_json(event))
296+
encoded_event = self._encode(event, 1)
297+
truncated_event_size = len(encoded_event)
292298
telemetry.record_span_event_raw_size(event, raw_event_size)
293299
telemetry.record_span_event_size(event, truncated_event_size or raw_event_size)
294-
self._enqueue(event, truncated_event_size or raw_event_size)
295-
296-
def _data(self, events: List[LLMObsSpanEvent]) -> List[Dict[str, Any]]:
297-
return [
298-
{"_dd.stage": "raw", "_dd.tracer_version": ddtrace.__version__, "event_type": "span", "spans": [event]}
299-
for event in events
300-
]
300+
self._enqueue(encoded_event, truncated_event_size or raw_event_size)
301+
302+
def _data(self, events: List[str]) -> str:
303+
payload = []
304+
tracer_version = ddtrace.__version__
305+
for event in events:
306+
payload.append(
307+
f'{{"_dd.stage": "raw", "_dd.tracer_version": "{tracer_version}", "event_type": "span", "spans": [{event}]}}'
308+
)
309+
return "[{}]".format(",".join(payload))
301310

302311

303312
def _truncate_span_event(event: LLMObsSpanEvent) -> LLMObsSpanEvent:

0 commit comments

Comments
 (0)