Skip to content

Commit 1e15750

Browse files
committed
working wip
1 parent 1e1626a commit 1e15750

File tree

1 file changed

+37
-24
lines changed

1 file changed

+37
-24
lines changed

ddtrace/llmobs/_writer.py

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ def __init__(
111111
) -> None:
112112
super(BaseLLMObsWriter, self).__init__(interval=interval)
113113
self._lock = forksafe.RLock()
114-
self._buffer: List[Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent]] = []
114+
self._buffer: List[str] = []
115115
self._buffer_size: int = 0
116116
self._timeout: float = timeout
117117
self._api_key: str = _api_key or config._dd_api_key
@@ -148,7 +148,7 @@ def stop(self, timeout=None):
148148
def on_shutdown(self):
149149
self.periodic()
150150

151-
def _enqueue(self, event: Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent], event_size: int) -> None:
151+
def _enqueue(self, encoded_event: str, event_size: int) -> None:
152152
"""Internal shared logic of enqueuing events to be submitted to LLM Observability."""
153153
with self._lock:
154154
if len(self._buffer) >= self.BUFFER_LIMIT:
@@ -160,7 +160,7 @@ def _enqueue(self, event: Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent], e
160160
if self._buffer_size + event_size > EVP_PAYLOAD_SIZE_LIMIT:
161161
logger.debug("manually flushing buffer because queueing next event will exceed EVP payload limit")
162162
self.periodic()
163-
self._buffer.append(event)
163+
self._buffer.append(encoded_event)
164164
self._buffer_size += event_size
165165

166166
def _encode(self, payload, num_events):
@@ -177,7 +177,7 @@ def periodic(self) -> None:
177177
with self._lock:
178178
if not self._buffer:
179179
return
180-
events = self._buffer
180+
enc_events = self._buffer
181181
self._buffer = []
182182
self._buffer_size = 0
183183

@@ -188,16 +188,17 @@ def periodic(self) -> None:
188188
"`LLMObs.enable(api_key=...)` before running your application."
189189
)
190190
return
191-
data = self._data(events)
192-
enc_llm_events = self._encode(data, len(events))
193-
if not enc_llm_events:
194-
return
191+
payload = self._data(enc_events)
195192
try:
196-
self._send_payload_with_retry(enc_llm_events, len(events))
193+
self._send_payload_with_retry(payload, len(enc_events))
197194
except Exception:
198-
telemetry.record_dropped_payload(len(events), event_type=self.EVENT_TYPE, error="connection_error")
195+
telemetry.record_dropped_payload(len(enc_events), event_type=self.EVENT_TYPE, error="connection_error")
199196
logger.error(
200-
"failed to send %d LLMObs %s events to %s", len(events), self.EVENT_TYPE, self._intake, exc_info=True
197+
"failed to send %d LLMObs %s events to %s",
198+
len(enc_events),
199+
self.EVENT_TYPE,
200+
self._intake,
201+
exc_info=True,
201202
)
202203

203204
def _send_payload(self, payload: bytes, num_events: int):
@@ -263,11 +264,16 @@ class LLMObsEvalMetricWriter(BaseLLMObsWriter):
263264
ENDPOINT = EVAL_ENDPOINT
264265

265266
def enqueue(self, event: LLMObsEvaluationMetricEvent) -> None:
266-
event_size = len(safe_json(event))
267-
self._enqueue(event, event_size)
267+
encoded_event = self._encode(event, 1)
268+
if not encoded_event:
269+
return
270+
event_size = len(encoded_event)
271+
self._enqueue(encoded_event, event_size)
268272

269-
def _data(self, events: List[LLMObsEvaluationMetricEvent]) -> Dict[str, Any]:
270-
return {"data": {"type": "evaluation_metric", "attributes": {"metrics": events}}}
273+
def _data(self, events: List[str]) -> str:
274+
metrics = ",".join(events)
275+
enc_payload = f'{{"data": {{"type": "evaluation_metric", "attributes": {{"metrics": [{metrics}]}}}}}}'
276+
return enc_payload
271277

272278

273279
class LLMObsSpanWriter(BaseLLMObsWriter):
@@ -279,7 +285,10 @@ class LLMObsSpanWriter(BaseLLMObsWriter):
279285
ENDPOINT = SPAN_ENDPOINT
280286

281287
def enqueue(self, event: LLMObsSpanEvent) -> None:
282-
raw_event_size = len(safe_json(event))
288+
encoded_event = self._encode(event, 1)
289+
if not encoded_event:
290+
return
291+
raw_event_size = len(encoded_event)
283292
truncated_event_size = None
284293
should_truncate = raw_event_size >= EVP_EVENT_SIZE_LIMIT
285294
if should_truncate:
@@ -288,16 +297,20 @@ def enqueue(self, event: LLMObsSpanEvent) -> None:
288297
raw_event_size,
289298
)
290299
event = _truncate_span_event(event)
291-
truncated_event_size = len(safe_json(event))
300+
encoded_event = self._encode(event, 1)
301+
truncated_event_size = len(encoded_event)
292302
telemetry.record_span_event_raw_size(event, raw_event_size)
293303
telemetry.record_span_event_size(event, truncated_event_size or raw_event_size)
294-
self._enqueue(event, truncated_event_size or raw_event_size)
295-
296-
def _data(self, events: List[LLMObsSpanEvent]) -> List[Dict[str, Any]]:
297-
return [
298-
{"_dd.stage": "raw", "_dd.tracer_version": ddtrace.__version__, "event_type": "span", "spans": [event]}
299-
for event in events
300-
]
304+
self._enqueue(encoded_event, truncated_event_size or raw_event_size)
305+
306+
def _data(self, events: List[str]) -> str:
307+
payload = []
308+
tracer_version = ddtrace.__version__
309+
for event in events:
310+
payload.append(
311+
f'{{"_dd.stage": "raw", "_dd.tracer_version": "{tracer_version}", "event_type": "span", "spans": [{event}]}}'
312+
)
313+
return "[{}]".format(",".join(payload))
301314

302315

303316
def _truncate_span_event(event: LLMObsSpanEvent) -> LLMObsSpanEvent:

0 commit comments

Comments
 (0)