From 5faab946e7dc3fddff2585e90b5177ab7c82c7f6 Mon Sep 17 00:00:00 2001 From: lievan <42917263+lievan@users.noreply.github.com> Date: Thu, 30 Jan 2025 13:37:22 -0500 Subject: [PATCH 1/2] fix(llmobs): don't enqueue spans to an inactive evaluator runner (#12150) This fix resolves an issue where spans were being enqueued to the buffer of a **inactive** evaluator runner, which caused noisy warning logs related to the evaluator runner's buffer being full - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: lievan --- ddtrace/llmobs/_evaluators/runner.py | 3 ++ ...evaluator-runner-log-fcd0aa18602d4252.yaml | 5 +++ tests/llmobs/test_llmobs_evaluator_runner.py | 41 +++++++++++-------- tests/llmobs/test_llmobs_service.py | 3 ++ 4 files changed, 34 insertions(+), 18 deletions(-) create mode 100644 releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py index 7f08b258f62..e58b4c318bf 100644 --- a/ddtrace/llmobs/_evaluators/runner.py +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -8,6 +8,7 @@ from ddtrace.internal import service from ddtrace.internal.logger import get_logger from ddtrace.internal.periodic import PeriodicService +from ddtrace.internal.service import ServiceStatus from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator from ddtrace.llmobs._evaluators.sampler import EvaluatorRunnerSampler @@ -73,6 +74,8 @@ def on_shutdown(self): self.executor.shutdown() def enqueue(self, span_event: Dict, span: Span) -> None: + if self.status == ServiceStatus.STOPPED: + return with self._lock: if len(self._buffer) >= self._buffer_limit: logger.warning( diff --git a/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml b/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml new file mode 100644 index 00000000000..7149c48c4f1 --- /dev/null +++ b/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + LLM Observability: This fix resolves an issue where spans were being enqueued to an inactive evaluator runner which caused noisy logs + related to the evaluator runner buffer being full. diff --git a/tests/llmobs/test_llmobs_evaluator_runner.py b/tests/llmobs/test_llmobs_evaluator_runner.py index a846914b3ac..9f938a412e2 100644 --- a/tests/llmobs/test_llmobs_evaluator_runner.py +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -18,6 +18,14 @@ DUMMY_SPAN = Span("dummy_span") +@pytest.fixture +def active_evaluator_runner(llmobs, mock_ragas_evaluator): + evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=llmobs) + evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=llmobs)) + evaluator_runner.start() + yield evaluator_runner + + def _dummy_ragas_eval_metric_event(span_id, trace_id): return LLMObsEvaluationMetricEvent( span_id=span_id, @@ -31,39 +39,36 @@ def _dummy_ragas_eval_metric_event(span_id, trace_id): ) -def test_evaluator_runner_start(mock_evaluator_logs, mock_ragas_evaluator): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) - evaluator_runner.evaluators.append(mock_ragas_evaluator) - evaluator_runner.start() - mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")]) +def test_evaluator_runner_start(mock_evaluator_logs, active_evaluator_runner): + mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r", "EvaluatorRunner")]) -def test_evaluator_runner_buffer_limit(mock_evaluator_logs): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) +def test_evaluator_runner_buffer_limit(mock_evaluator_logs, active_evaluator_runner): for _ in range(1001): - evaluator_runner.enqueue({}, DUMMY_SPAN) + active_evaluator_runner.enqueue({}, DUMMY_SPAN) mock_evaluator_logs.warning.assert_called_with( "%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000 ) -def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) - evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) - evaluator_runner.periodic() +def test_evaluator_runner_periodic_enqueues_eval_metric(mock_llmobs_eval_metric_writer, active_evaluator_runner): + active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) + active_evaluator_runner.periodic() mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( _dummy_ragas_eval_metric_event(span_id="123", trace_id="1234") ) -@pytest.mark.vcr_logs -def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, mock_ragas_evaluator): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) - evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) +def test_evaluator_runner_stopped_does_not_enqueue_metric(llmobs, mock_llmobs_eval_metric_writer): + evaluator_runner = EvaluatorRunner(interval=0.1, llmobs_service=llmobs) evaluator_runner.start() - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) + assert not evaluator_runner._buffer + assert mock_llmobs_eval_metric_writer.enqueue.call_count == 0 + + +def test_evaluator_runner_timed_enqueues_eval_metric(llmobs, mock_llmobs_eval_metric_writer, active_evaluator_runner): + active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) time.sleep(0.1) diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index a3768789939..c5f159ee791 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -1601,6 +1601,7 @@ def test_llmobs_fork_recreates_and_restarts_eval_metric_writer(): def test_llmobs_fork_recreates_and_restarts_evaluator_runner(mock_ragas_evaluator): """Test that forking a process correctly recreates and restarts the EvaluatorRunner.""" + pytest.importorskip("ragas") with override_env(dict(_DD_LLMOBS_EVALUATORS="ragas_faithfulness")): with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app") @@ -1690,6 +1691,8 @@ def test_llmobs_fork_submit_evaluation(monkeypatch): def test_llmobs_fork_evaluator_runner_run(monkeypatch): """Test that forking a process correctly encodes new spans created in each process.""" monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 5.0) + pytest.importorskip("ragas") + monkeypatch.setenv("_DD_LLMOBS_EVALUATORS", "ragas_faithfulness") with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key") pid = os.fork() From d6362d481bc241c918040108cc4ab1434055a355 Mon Sep 17 00:00:00 2001 From: lievan Date: Mon, 3 Feb 2025 11:16:55 -0500 Subject: [PATCH 2/2] fix test --- ddtrace/llmobs/_evaluators/runner.py | 2 +- tests/llmobs/test_llmobs_evaluator_runner.py | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py index e58b4c318bf..3eb7cd75490 100644 --- a/ddtrace/llmobs/_evaluators/runner.py +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -55,7 +55,7 @@ def start(self, *args, **kwargs): logger.debug("no evaluators configured, not starting %r", self.__class__.__name__) return super(EvaluatorRunner, self).start() - logger.debug("started %r to %r", self.__class__.__name__) + logger.debug("started %r", self.__class__.__name__) atexit.register(self.on_shutdown) def stop(self, *args, **kwargs): diff --git a/tests/llmobs/test_llmobs_evaluator_runner.py b/tests/llmobs/test_llmobs_evaluator_runner.py index 9f938a412e2..cde58dbbdd5 100644 --- a/tests/llmobs/test_llmobs_evaluator_runner.py +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -19,9 +19,9 @@ @pytest.fixture -def active_evaluator_runner(llmobs, mock_ragas_evaluator): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=llmobs) - evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=llmobs)) +def active_evaluator_runner(LLMObs, mock_ragas_evaluator): + evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) + evaluator_runner.evaluators.append(mock_ragas_evaluator(llmobs_service=LLMObs)) evaluator_runner.start() yield evaluator_runner @@ -59,15 +59,15 @@ def test_evaluator_runner_periodic_enqueues_eval_metric(mock_llmobs_eval_metric_ ) -def test_evaluator_runner_stopped_does_not_enqueue_metric(llmobs, mock_llmobs_eval_metric_writer): - evaluator_runner = EvaluatorRunner(interval=0.1, llmobs_service=llmobs) +def test_evaluator_runner_stopped_does_not_enqueue_metric(LLMObs, mock_llmobs_eval_metric_writer): + evaluator_runner = EvaluatorRunner(interval=0.1, llmobs_service=LLMObs) evaluator_runner.start() evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) assert not evaluator_runner._buffer assert mock_llmobs_eval_metric_writer.enqueue.call_count == 0 -def test_evaluator_runner_timed_enqueues_eval_metric(llmobs, mock_llmobs_eval_metric_writer, active_evaluator_runner): +def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, active_evaluator_runner): active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) time.sleep(0.1)