Skip to content

Commit

Permalink
fix(llmobs): don't enqueue spans to an inactive evaluator runner (#12150
Browse files Browse the repository at this point in the history
)

This fix resolves an issue where spans were being enqueued to the buffer
of a **inactive** evaluator runner, which caused noisy warning logs
related to the evaluator runner's buffer being full

- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: lievan <[email protected]>
  • Loading branch information
lievan and lievan committed Feb 3, 2025
1 parent aa1fbaa commit 7ff4fa7
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 18 deletions.
5 changes: 4 additions & 1 deletion ddtrace/llmobs/_evaluators/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ddtrace.internal import forksafe
from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import PeriodicService
from ddtrace.internal.service import ServiceStatus
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT
from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator
Expand Down Expand Up @@ -70,7 +71,7 @@ def start(self, *args, **kwargs):
logger.debug("no evaluators configured, not starting %r", self.__class__.__name__)
return
super(EvaluatorRunner, self).start()
logger.debug("started %r to %r", self.__class__.__name__)
logger.debug("started %r", self.__class__.__name__)

def _stop_service(self) -> None:
"""
Expand All @@ -88,6 +89,8 @@ def recreate(self) -> "EvaluatorRunner":
)

def enqueue(self, span_event: Dict, span: Span) -> None:
if self.status == ServiceStatus.STOPPED:
return
with self._lock:
if len(self._buffer) >= self._buffer_limit:
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
LLM Observability: This fix resolves an issue where spans were being enqueued to an inactive evaluator runner which caused noisy logs
related to the evaluator runner buffer being full.
39 changes: 22 additions & 17 deletions tests/llmobs/test_llmobs_evaluator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,44 @@
DUMMY_SPAN = Span("dummy_span")


def test_evaluator_runner_start(mock_evaluator_logs):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock())
evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=mock.MagicMock()))
@pytest.fixture
def active_evaluator_runner(LLMObs):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs)
evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=LLMObs))
evaluator_runner.start()
mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")])
yield evaluator_runner


def test_evaluator_runner_start(mock_evaluator_logs, active_evaluator_runner):
mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r", "EvaluatorRunner")])

def test_evaluator_runner_buffer_limit(mock_evaluator_logs):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock())

def test_evaluator_runner_buffer_limit(mock_evaluator_logs, active_evaluator_runner):
for _ in range(1001):
evaluator_runner.enqueue({}, DUMMY_SPAN)
active_evaluator_runner.enqueue({}, DUMMY_SPAN)
mock_evaluator_logs.warning.assert_called_with(
"%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000
)


def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs)
evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=LLMObs))
evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN)
evaluator_runner.periodic()
def test_evaluator_runner_periodic_enqueues_eval_metric(mock_llmobs_eval_metric_writer, active_evaluator_runner):
active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN)
active_evaluator_runner.periodic()
mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with(
_dummy_evaluator_eval_metric_event(span_id="123", trace_id="1234")
)


@pytest.mark.vcr_logs
def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs)
evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=LLMObs))
def test_evaluator_runner_stopped_does_not_enqueue_metric(LLMObs, mock_llmobs_eval_metric_writer):
evaluator_runner = EvaluatorRunner(interval=0.1, llmobs_service=LLMObs)
evaluator_runner.start()

evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN)
assert not evaluator_runner._buffer
assert mock_llmobs_eval_metric_writer.enqueue.call_count == 0


def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, active_evaluator_runner):
active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN)

time.sleep(0.1)

Expand Down
3 changes: 3 additions & 0 deletions tests/llmobs/test_llmobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1560,6 +1560,7 @@ def test_llmobs_fork_recreates_and_restarts_eval_metric_writer():

def test_llmobs_fork_recreates_and_restarts_evaluator_runner(mock_ragas_evaluator):
"""Test that forking a process correctly recreates and restarts the EvaluatorRunner."""
pytest.importorskip("ragas")
with override_env(dict(_DD_LLMOBS_EVALUATORS="ragas_faithfulness")):
with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app")
Expand Down Expand Up @@ -1649,6 +1650,8 @@ def test_llmobs_fork_submit_evaluation(monkeypatch):
def test_llmobs_fork_evaluator_runner_run(monkeypatch):
"""Test that forking a process correctly encodes new spans created in each process."""
monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 5.0)
pytest.importorskip("ragas")
monkeypatch.setenv("_DD_LLMOBS_EVALUATORS", "ragas_faithfulness")
with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key")
pid = os.fork()
Expand Down

0 comments on commit 7ff4fa7

Please sign in to comment.