diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py index 7f08b258f62..3eb7cd75490 100644 --- a/ddtrace/llmobs/_evaluators/runner.py +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -8,6 +8,7 @@ from ddtrace.internal import service from ddtrace.internal.logger import get_logger from ddtrace.internal.periodic import PeriodicService +from ddtrace.internal.service import ServiceStatus from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator from ddtrace.llmobs._evaluators.sampler import EvaluatorRunnerSampler @@ -54,7 +55,7 @@ def start(self, *args, **kwargs): logger.debug("no evaluators configured, not starting %r", self.__class__.__name__) return super(EvaluatorRunner, self).start() - logger.debug("started %r to %r", self.__class__.__name__) + logger.debug("started %r", self.__class__.__name__) atexit.register(self.on_shutdown) def stop(self, *args, **kwargs): @@ -73,6 +74,8 @@ def on_shutdown(self): self.executor.shutdown() def enqueue(self, span_event: Dict, span: Span) -> None: + if self.status == ServiceStatus.STOPPED: + return with self._lock: if len(self._buffer) >= self._buffer_limit: logger.warning( diff --git a/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml b/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml new file mode 100644 index 00000000000..7149c48c4f1 --- /dev/null +++ b/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + LLM Observability: This fix resolves an issue where spans were being enqueued to an inactive evaluator runner which caused noisy logs + related to the evaluator runner buffer being full. diff --git a/tests/llmobs/test_llmobs_evaluator_runner.py b/tests/llmobs/test_llmobs_evaluator_runner.py index a846914b3ac..cde58dbbdd5 100644 --- a/tests/llmobs/test_llmobs_evaluator_runner.py +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -18,6 +18,14 @@ DUMMY_SPAN = Span("dummy_span") +@pytest.fixture +def active_evaluator_runner(LLMObs, mock_ragas_evaluator): + evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) + evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) + evaluator_runner.start() + yield evaluator_runner + + def _dummy_ragas_eval_metric_event(span_id, trace_id): return LLMObsEvaluationMetricEvent( span_id=span_id, @@ -31,39 +39,36 @@ def _dummy_ragas_eval_metric_event(span_id, trace_id): ) -def test_evaluator_runner_start(mock_evaluator_logs, mock_ragas_evaluator): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) - evaluator_runner.evaluators.append(mock_ragas_evaluator) - evaluator_runner.start() - mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")]) +def test_evaluator_runner_start(mock_evaluator_logs, active_evaluator_runner): + mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r", "EvaluatorRunner")]) -def test_evaluator_runner_buffer_limit(mock_evaluator_logs): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) +def test_evaluator_runner_buffer_limit(mock_evaluator_logs, active_evaluator_runner): for _ in range(1001): - evaluator_runner.enqueue({}, DUMMY_SPAN) + active_evaluator_runner.enqueue({}, DUMMY_SPAN) mock_evaluator_logs.warning.assert_called_with( "%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000 ) -def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) - evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) - evaluator_runner.periodic() +def test_evaluator_runner_periodic_enqueues_eval_metric(mock_llmobs_eval_metric_writer, active_evaluator_runner): + active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) + active_evaluator_runner.periodic() mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") ) -@pytest.mark.vcr_logs -def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) - evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) +def test_evaluator_runner_stopped_does_not_enqueue_metric(LLMObs, mock_llmobs_eval_metric_writer): + evaluator_runner = EvaluatorRunner(interval=0.1, llmobs_service=LLMObs) evaluator_runner.start() - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) + assert not evaluator_runner._buffer + assert mock_llmobs_eval_metric_writer.enqueue.call_count == 0 + + +def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, active_evaluator_runner): + active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) time.sleep(0.1) diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index a3768789939..c5f159ee791 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -1601,6 +1601,7 @@ def test_llmobs_fork_recreates_and_restarts_eval_metric_writer(): def test_llmobs_fork_recreates_and_restarts_evaluator_runner(mock_ragas_evaluator): """Test that forking a process correctly recreates and restarts the EvaluatorRunner.""" + pytest.importorskip("ragas") with override_env(dict(_DD_LLMOBS_EVALUATORS="ragas_faithfulness")): with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app") @@ -1690,6 +1691,8 @@ def test_llmobs_fork_submit_evaluation(monkeypatch): def test_llmobs_fork_evaluator_runner_run(monkeypatch): """Test that forking a process correctly encodes new spans created in each process.""" monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 5.0) + pytest.importorskip("ragas") + monkeypatch.setenv("_DD_LLMOBS_EVALUATORS", "ragas_faithfulness") with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key") pid = os.fork()