Skip to content

Commit 479699d

Browse files
Yun-KimIAL32
and
IAL32
authored
fix(llmobs): reuse shared conn (#13339)
Resolves #13336. Credit to @IAL32 and a cherry-pick from #13338. When we made the jump from using the shared HTTPWriter to our own BaseLLMObsWriter class to submit spans and evals #12966, we used our own `_get_connection()` to return HTTP/HTTPS connections. However we forgot to include UDSHTTP connection (for the unix socket case), which means we broke UDS support until now. ### Why was this a problem in the first place? We used our own `_get_connection()` in #12966 because of an issue where creating the shared HTTPConnection helper class was leading to MRO superclass constructor issues in our tests. At the time we thought this was due to the shared HTTPConnection helper class having multiple superclasses and an issue with Python 3.10 in general, but this turns out to be due to vcrpy mocking HTTPConnection entirely and only being an issue in tests that rely on vcrpy. This PR makes some changes to avoid using vcrpy when not necessary, and making better assertions to ensure that spans are being sent (not necessary in most tests to have them be accepted). ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) [](https://datadoghq.atlassian.net/browse/MLOB-2725) --------- Co-authored-by: IAL32 <[email protected]>
1 parent 8fa23a4 commit 479699d

10 files changed

+49
-155
lines changed

ddtrace/llmobs/_writer.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import atexit
2-
import http.client as httplib
32
from typing import Any
43
from typing import Dict
54
from typing import List
@@ -19,7 +18,8 @@
1918
from ddtrace.internal import forksafe
2019
from ddtrace.internal.logger import get_logger
2120
from ddtrace.internal.periodic import PeriodicService
22-
from ddtrace.internal.utils.http import verify_url
21+
from ddtrace.internal.utils.http import Response
22+
from ddtrace.internal.utils.http import get_connection
2323
from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter
2424
from ddtrace.llmobs import _telemetry as telemetry
2525
from ddtrace.llmobs._constants import AGENTLESS_EVAL_BASE_URL
@@ -132,7 +132,7 @@ def __init__(
132132
self._send_payload_with_retry = fibonacci_backoff_with_jitter(
133133
attempts=self.RETRY_ATTEMPTS,
134134
initial_wait=0.618 * self.interval / (1.618**self.RETRY_ATTEMPTS) / 2,
135-
until=lambda result: isinstance(result, httplib.HTTPResponse),
135+
until=lambda result: isinstance(result, Response),
136136
)(self._send_payload)
137137

138138
def start(self, *args, **kwargs):
@@ -201,7 +201,7 @@ def periodic(self) -> None:
201201
)
202202

203203
def _send_payload(self, payload: bytes, num_events: int):
204-
conn = self._get_connection()
204+
conn = get_connection(self._intake)
205205
try:
206206
conn.request("POST", self._endpoint, payload, self._headers)
207207
resp = conn.getresponse()
@@ -217,7 +217,7 @@ def _send_payload(self, payload: bytes, num_events: int):
217217
telemetry.record_dropped_payload(num_events, event_type=self.EVENT_TYPE, error="http_error")
218218
else:
219219
logger.debug("sent %d LLMObs %s events to %s", num_events, self.EVENT_TYPE, self._url)
220-
return resp
220+
return Response.from_http_response(resp)
221221
except Exception:
222222
logger.error(
223223
"failed to send %d LLMObs %s events to %s", num_events, self.EVENT_TYPE, self._intake, exc_info=True
@@ -226,15 +226,6 @@ def _send_payload(self, payload: bytes, num_events: int):
226226
finally:
227227
conn.close()
228228

229-
def _get_connection(self):
230-
"""Return the connection to the LLM Observability endpoint."""
231-
parsed = verify_url(self._intake)
232-
if parsed.scheme == "https":
233-
return httplib.HTTPSConnection(parsed.hostname or "", parsed.port, timeout=self._timeout)
234-
elif parsed.scheme == "http":
235-
return httplib.HTTPConnection(parsed.hostname or "", parsed.port, timeout=self._timeout)
236-
raise ConnectionError("Unable to connect, invalid URL: %s", self._intake)
237-
238229
@property
239230
def _url(self) -> str:
240231
return f"{self._intake}{self._endpoint}"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
LLM Observability: Resolves an issue where spans and evaluation metrics were not being sent via Unix sockets.

tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_eval_metric_agentless_writer.test_send_metric_bad_api_key.yaml

Lines changed: 0 additions & 33 deletions
This file was deleted.

tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_evaluator_runner.send_score_metric.yaml

Lines changed: 0 additions & 37 deletions
This file was deleted.

tests/llmobs/llmobs_cassettes/tests.llmobs.test_llmobs_span_agentless_writer.test_send_completion_bad_api_key.yaml

Lines changed: 0 additions & 43 deletions
This file was deleted.

tests/llmobs/test_llmobs_eval_metric_agent_writer.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
INTAKE_ENDPOINT = agent_config.trace_agent_url
1414
AGENT_PROXY_URL = f"{INTAKE_ENDPOINT}{EVP_PROXY_AGENT_BASE_PATH}{EVAL_ENDPOINT}"
15+
UNIX_AGENT_INTAKE = "unix:///var/run/datadog/apm.sock"
16+
UNIX_AGENT_PROXY_URL = "{}{}{}".format(UNIX_AGENT_INTAKE, EVP_PROXY_AGENT_BASE_PATH, EVAL_ENDPOINT)
1517

1618

1719
def test_writer_start(mock_writer_logs):
@@ -21,6 +23,15 @@ def test_writer_start(mock_writer_logs):
2123
llmobs_eval_metric_writer.stop()
2224

2325

26+
def test_unix_socket_writer_start(mock_writer_logs):
27+
llmobs_eval_metric_writer = LLMObsEvalMetricWriter(1, 1, is_agentless=False, _override_url=UNIX_AGENT_INTAKE)
28+
llmobs_eval_metric_writer.start()
29+
mock_writer_logs.debug.assert_has_calls(
30+
[mock.call("started %r to %r", "LLMObsEvalMetricWriter", UNIX_AGENT_PROXY_URL)]
31+
)
32+
llmobs_eval_metric_writer.stop()
33+
34+
2435
def test_buffer_limit(mock_writer_logs):
2536
llmobs_eval_metric_writer = LLMObsEvalMetricWriter(1, 1, is_agentless=False)
2637
for _ in range(1001):

tests/llmobs/test_llmobs_eval_metric_agentless_writer.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def test_buffer_limit(mock_writer_logs):
6363
)
6464

6565

66-
@pytest.mark.vcr_logs
6766
def test_send_metric_bad_api_key(mock_writer_logs):
6867
llmobs_eval_metric_writer = LLMObsEvalMetricWriter(1, 1, is_agentless=True, _site=DD_SITE, _api_key="<bad-api-key>")
6968

@@ -97,7 +96,7 @@ def test_send_categorical_metric(mock_writer_logs):
9796
llmobs_eval_metric_writer.enqueue(_categorical_metric_event())
9897
llmobs_eval_metric_writer.periodic()
9998
mock_writer_logs.debug.assert_has_calls(
100-
[mock.call("sent %d LLMObs %s events to %s", 1, "evaluation_metric", INTAKE_ENDPOINT)]
99+
[mock.call("encoded %d LLMObs %s events to be sent", 1, "evaluation_metric")]
101100
)
102101

103102

@@ -107,7 +106,7 @@ def test_send_score_metric(mock_writer_logs):
107106
llmobs_eval_metric_writer.enqueue(_score_metric_event())
108107
llmobs_eval_metric_writer.periodic()
109108
mock_writer_logs.debug.assert_has_calls(
110-
[mock.call("sent %d LLMObs %s events to %s", 1, "evaluation_metric", INTAKE_ENDPOINT)]
109+
[mock.call("encoded %d LLMObs %s events to be sent", 1, "evaluation_metric")]
111110
)
112111

113112

@@ -120,13 +119,13 @@ def test_send_timed_events(mock_writer_logs):
120119
llmobs_eval_metric_writer.enqueue(_score_metric_event())
121120
time.sleep(0.1)
122121
mock_writer_logs.debug.assert_has_calls(
123-
[mock.call("sent %d LLMObs %s events to %s", 1, "evaluation_metric", INTAKE_ENDPOINT)]
122+
[mock.call("encoded %d LLMObs %s events to be sent", 1, "evaluation_metric")]
124123
)
125124
mock_writer_logs.reset_mock()
126125
llmobs_eval_metric_writer.enqueue(_categorical_metric_event())
127126
time.sleep(0.1)
128127
mock_writer_logs.debug.assert_has_calls(
129-
[mock.call("sent %d LLMObs %s events to %s", 1, "evaluation_metric", INTAKE_ENDPOINT)]
128+
[mock.call("encoded %d LLMObs %s events to be sent", 1, "evaluation_metric")]
130129
)
131130
llmobs_eval_metric_writer.stop()
132131

@@ -138,7 +137,9 @@ def test_send_multiple_events(mock_writer_logs):
138137
llmobs_eval_metric_writer.enqueue(_score_metric_event())
139138
llmobs_eval_metric_writer.enqueue(_categorical_metric_event())
140139
llmobs_eval_metric_writer.periodic()
141-
mock_writer_logs.debug.assert_called_with("sent %d LLMObs %s events to %s", 2, "evaluation_metric", INTAKE_ENDPOINT)
140+
mock_writer_logs.debug.assert_has_calls(
141+
[mock.call("encoded %d LLMObs %s events to be sent", 2, "evaluation_metric")]
142+
)
142143

143144

144145
def test_send_on_exit(mock_writer_logs, run_python_code_in_subprocess):
@@ -149,15 +150,9 @@ def test_send_on_exit(mock_writer_logs, run_python_code_in_subprocess):
149150
env.update({"PYTHONPATH": ":".join(pypath), "DD_LLMOBS_ML_APP": "unnamed-ml-app"})
150151
out, err, status, pid = run_python_code_in_subprocess(
151152
"""
152-
import atexit
153-
154153
from ddtrace.llmobs._writer import LLMObsEvalMetricWriter
155154
from tests.llmobs.test_llmobs_eval_metric_agentless_writer import _score_metric_event
156-
from tests.llmobs._utils import logs_vcr
157155
158-
ctx = logs_vcr.use_cassette("tests.llmobs.test_llmobs_eval_metric_agentless_writer.send_score_metric.yaml")
159-
ctx.__enter__()
160-
atexit.register(lambda: ctx.__exit__())
161156
llmobs_eval_metric_writer = LLMObsEvalMetricWriter(
162157
0.01, 1, is_agentless=True, _site="datad0g.com", _api_key="<not-a-real-key>"
163158
)
@@ -168,4 +163,8 @@ def test_send_on_exit(mock_writer_logs, run_python_code_in_subprocess):
168163
)
169164
assert status == 0, err
170165
assert out == b""
171-
assert err == b""
166+
assert b"got response code 403" in err
167+
assert (
168+
b'status: b\'{"status":"error","code":403,"errors":["Forbidden"],"statuspage":"http://status.datadoghq.com","twitter":"http://twitter.com/datadogops","email":"[email protected]"}\'\n'
169+
in err
170+
)

tests/llmobs/test_llmobs_evaluator_runner.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,16 +102,10 @@ def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subproces
102102
)
103103
out, err, status, pid = run_python_code_in_subprocess(
104104
"""
105-
import atexit
106-
107105
from ddtrace.llmobs import LLMObs
108106
from ddtrace.llmobs._evaluators.runner import EvaluatorRunner
109-
from tests.llmobs._utils import logs_vcr
110107
from tests.llmobs._utils import DummyEvaluator
111108
112-
ctx = logs_vcr.use_cassette("tests.llmobs.test_llmobs_evaluator_runner.send_score_metric.yaml")
113-
ctx.__enter__()
114-
atexit.register(lambda: ctx.__exit__())
115109
LLMObs.enable(api_key="dummy-api-key", site="datad0g.com", ml_app="unnamed-ml-app", agentless_enabled=True)
116110
LLMObs._instance._evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=LLMObs))
117111
LLMObs._instance._evaluator_runner.start()
@@ -121,7 +115,11 @@ def test_evaluator_runner_on_exit(mock_writer_logs, run_python_code_in_subproces
121115
)
122116
assert status == 0, err
123117
assert out == b""
124-
assert err == b""
118+
assert b"got response code 403" in err
119+
assert (
120+
b'status: b\'{"status":"error","code":403,"errors":["Forbidden"],"statuspage":"http://status.datadoghq.com","twitter":"http://twitter.com/datadogops","email":"[email protected]"}\'\n'
121+
in err
122+
)
125123

126124

127125
def test_evaluator_runner_unsupported_evaluator():

tests/llmobs/test_llmobs_span_agent_writer.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,22 @@
1616

1717
INTAKE_ENDPOINT = agent_config.trace_agent_url
1818
AGENT_PROXY_URL = "{}{}{}".format(INTAKE_ENDPOINT, EVP_PROXY_AGENT_BASE_PATH, SPAN_ENDPOINT)
19+
UNIX_AGENT_INTAKE = "unix:///var/run/datadog/apm.sock"
20+
UNIX_AGENT_PROXY_URL = "{}{}{}".format(UNIX_AGENT_INTAKE, EVP_PROXY_AGENT_BASE_PATH, SPAN_ENDPOINT)
1921

2022

2123
def test_writer_start(mock_writer_logs):
2224
llmobs_span_writer = LLMObsSpanWriter(1, 1, is_agentless=False)
2325
llmobs_span_writer.start()
2426
mock_writer_logs.debug.assert_has_calls([mock.call("started %r to %r", "LLMObsSpanWriter", AGENT_PROXY_URL)])
27+
llmobs_span_writer.stop()
28+
29+
30+
def test_unix_socket_writer_start(mock_writer_logs):
31+
llmobs_span_writer = LLMObsSpanWriter(1, 1, is_agentless=False, _override_url=UNIX_AGENT_INTAKE)
32+
llmobs_span_writer.start()
33+
mock_writer_logs.debug.assert_has_calls([mock.call("started %r to %r", "LLMObsSpanWriter", UNIX_AGENT_PROXY_URL)])
34+
llmobs_span_writer.stop()
2535

2636

2737
def test_buffer_limit(mock_writer_logs):

tests/llmobs/test_llmobs_span_agentless_writer.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ def test_send_chat_completion_event(mock_writer_logs):
8787
mock_writer_logs.debug.assert_has_calls([mock.call("encoded %d LLMObs %s events to be sent", 1, "span")])
8888

8989

90-
@pytest.mark.vcr_logs
9190
def test_send_completion_bad_api_key(mock_writer_logs):
9291
llmobs_span_writer = LLMObsSpanWriter(1, 1, is_agentless=True, _site=DD_SITE, _api_key="<bad-api-key>")
9392
llmobs_span_writer.enqueue(_completion_event())
@@ -149,15 +148,9 @@ def test_send_on_exit(run_python_code_in_subprocess):
149148

150149
out, err, status, pid = run_python_code_in_subprocess(
151150
"""
152-
import atexit
153-
154151
from ddtrace.llmobs._writer import LLMObsSpanWriter
155152
from tests.llmobs.test_llmobs_span_agentless_writer import _completion_event
156-
from tests.llmobs._utils import logs_vcr
157153
158-
ctx = logs_vcr.use_cassette("tests.llmobs.test_llmobs_span_agentless_writer.test_send_completion_event.yaml")
159-
ctx.__enter__()
160-
atexit.register(lambda: ctx.__exit__())
161154
llmobs_span_writer = LLMObsSpanWriter(0.01, 1, is_agentless=True, _site="datad0g.com", _api_key="<not-a-real-key>")
162155
llmobs_span_writer.start()
163156
llmobs_span_writer.enqueue(_completion_event())
@@ -166,4 +159,5 @@ def test_send_on_exit(run_python_code_in_subprocess):
166159
)
167160
assert status == 0, err
168161
assert out == b""
169-
assert err == b""
162+
assert b"got response code 403" in err
163+
assert b'status: b\'{"errors":[{"status":"403","title":"Forbidden","detail":"API key is invalid"}]}\'\n' in err

0 commit comments

Comments
 (0)