Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(llmobs): don't enqueue spans to an inactive evaluator runner [backport 2.16] #12203

Draft
wants to merge 2 commits into
base: 2.16
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ddtrace/llmobs/_evaluators/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ddtrace.internal import service
from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import PeriodicService
from ddtrace.internal.service import ServiceStatus
from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator
from ddtrace.llmobs._evaluators.sampler import EvaluatorRunnerSampler

Expand Down Expand Up @@ -54,7 +55,7 @@ def start(self, *args, **kwargs):
logger.debug("no evaluators configured, not starting %r", self.__class__.__name__)
return
super(EvaluatorRunner, self).start()
logger.debug("started %r to %r", self.__class__.__name__)
logger.debug("started %r", self.__class__.__name__)
atexit.register(self.on_shutdown)

def stop(self, *args, **kwargs):
Expand All @@ -73,6 +74,8 @@ def on_shutdown(self):
self.executor.shutdown()

def enqueue(self, span_event: Dict, span: Span) -> None:
if self.status == ServiceStatus.STOPPED:
return
with self._lock:
if len(self._buffer) >= self._buffer_limit:
logger.warning(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
LLM Observability: This fix resolves an issue where spans were being enqueued to an inactive evaluator runner which caused noisy logs
related to the evaluator runner buffer being full.
41 changes: 23 additions & 18 deletions tests/llmobs/test_llmobs_evaluator_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@
DUMMY_SPAN = Span("dummy_span")


@pytest.fixture
def active_evaluator_runner(LLMObs, mock_ragas_evaluator):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs)
evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs))
evaluator_runner.start()
yield evaluator_runner


def _dummy_ragas_eval_metric_event(span_id, trace_id):
return LLMObsEvaluationMetricEvent(
span_id=span_id,
Expand All @@ -31,39 +39,36 @@ def _dummy_ragas_eval_metric_event(span_id, trace_id):
)


def test_evaluator_runner_start(mock_evaluator_logs, mock_ragas_evaluator):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock())
evaluator_runner.evaluators.append(mock_ragas_evaluator)
evaluator_runner.start()
mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")])
def test_evaluator_runner_start(mock_evaluator_logs, active_evaluator_runner):
mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r", "EvaluatorRunner")])


def test_evaluator_runner_buffer_limit(mock_evaluator_logs):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock())
def test_evaluator_runner_buffer_limit(mock_evaluator_logs, active_evaluator_runner):
for _ in range(1001):
evaluator_runner.enqueue({}, DUMMY_SPAN)
active_evaluator_runner.enqueue({}, DUMMY_SPAN)
mock_evaluator_logs.warning.assert_called_with(
"%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000
)


def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs)
evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs))
evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN)
evaluator_runner.periodic()
def test_evaluator_runner_periodic_enqueues_eval_metric(mock_llmobs_eval_metric_writer, active_evaluator_runner):
active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN)
active_evaluator_runner.periodic()
mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with(
_dummy_ragas_eval_metric_event(span_id="123", trace_id="1234")
)


@pytest.mark.vcr_logs
def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator):
evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs)
evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs))
def test_evaluator_runner_stopped_does_not_enqueue_metric(LLMObs, mock_llmobs_eval_metric_writer):
evaluator_runner = EvaluatorRunner(interval=0.1, llmobs_service=LLMObs)
evaluator_runner.start()

evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN)
assert not evaluator_runner._buffer
assert mock_llmobs_eval_metric_writer.enqueue.call_count == 0


def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, active_evaluator_runner):
active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN)

time.sleep(0.1)

Expand Down
3 changes: 3 additions & 0 deletions tests/llmobs/test_llmobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,7 @@ def test_llmobs_fork_recreates_and_restarts_eval_metric_writer():

def test_llmobs_fork_recreates_and_restarts_evaluator_runner(mock_ragas_evaluator):
"""Test that forking a process correctly recreates and restarts the EvaluatorRunner."""
pytest.importorskip("ragas")
with override_env(dict(_DD_LLMOBS_EVALUATORS="ragas_faithfulness")):
with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app")
Expand Down Expand Up @@ -1690,6 +1691,8 @@ def test_llmobs_fork_submit_evaluation(monkeypatch):
def test_llmobs_fork_evaluator_runner_run(monkeypatch):
"""Test that forking a process correctly encodes new spans created in each process."""
monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 5.0)
pytest.importorskip("ragas")
monkeypatch.setenv("_DD_LLMOBS_EVALUATORS", "ragas_faithfulness")
with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"):
llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key")
pid = os.fork()
Expand Down
Loading