From fb00b345c4063dc197b9b564460e9ba74be1cb6f Mon Sep 17 00:00:00 2001 From: lievan <42917263+lievan@users.noreply.github.com> Date: Thu, 30 Jan 2025 13:37:22 -0500 Subject: [PATCH] fix(llmobs): don't enqueue spans to an inactive evaluator runner (#12150) This fix resolves an issue where spans were being enqueued to the buffer of a **inactive** evaluator runner, which caused noisy warning logs related to the evaluator runner's buffer being full - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --------- Co-authored-by: lievan --- ddtrace/llmobs/_evaluators/runner.py | 5 ++- ...evaluator-runner-log-fcd0aa18602d4252.yaml | 5 +++ tests/llmobs/test_llmobs_evaluator_runner.py | 39 +++++++++++-------- tests/llmobs/test_llmobs_service.py | 3 ++ 4 files changed, 34 insertions(+), 18 deletions(-) create mode 100644 releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml diff --git a/ddtrace/llmobs/_evaluators/runner.py b/ddtrace/llmobs/_evaluators/runner.py index bf45e618e01..b470f1b98a9 100644 --- a/ddtrace/llmobs/_evaluators/runner.py +++ b/ddtrace/llmobs/_evaluators/runner.py @@ -6,6 +6,7 @@ from ddtrace.internal import forksafe from ddtrace.internal.logger import get_logger from ddtrace.internal.periodic import PeriodicService +from ddtrace.internal.service import ServiceStatus from ddtrace.internal.telemetry import telemetry_writer from ddtrace.internal.telemetry.constants import TELEMETRY_APM_PRODUCT from ddtrace.llmobs._evaluators.ragas.faithfulness import RagasFaithfulnessEvaluator @@ -70,7 +71,7 @@ def start(self, *args, **kwargs): logger.debug("no evaluators configured, not starting %r", self.__class__.__name__) return super(EvaluatorRunner, self).start() - logger.debug("started %r to %r", self.__class__.__name__) + logger.debug("started %r", self.__class__.__name__) def _stop_service(self) -> None: """ @@ -88,6 +89,8 @@ def recreate(self) -> "EvaluatorRunner": ) def enqueue(self, span_event: Dict, span: Span) -> None: + if self.status == ServiceStatus.STOPPED: + return with self._lock: if len(self._buffer) >= self._buffer_limit: logger.warning( diff --git a/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml b/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml new file mode 100644 index 00000000000..7149c48c4f1 --- /dev/null +++ b/releasenotes/notes/fix-evaluator-runner-log-fcd0aa18602d4252.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + LLM Observability: This fix resolves an issue where spans were being enqueued to an inactive evaluator runner which caused noisy logs + related to the evaluator runner buffer being full. diff --git a/tests/llmobs/test_llmobs_evaluator_runner.py b/tests/llmobs/test_llmobs_evaluator_runner.py index 7ee7d510276..c556656ea90 100644 --- a/tests/llmobs/test_llmobs_evaluator_runner.py +++ b/tests/llmobs/test_llmobs_evaluator_runner.py @@ -18,39 +18,44 @@ DUMMY_SPAN = Span("dummy_span") -def test_evaluator_runner_start(mock_evaluator_logs): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) - evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=mock.MagicMock())) +@pytest.fixture +def active_evaluator_runner(LLMObs): + evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) + evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=LLMObs)) evaluator_runner.start() - mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r to %r", "EvaluatorRunner")]) + yield evaluator_runner + +def test_evaluator_runner_start(mock_evaluator_logs, active_evaluator_runner): + mock_evaluator_logs.debug.assert_has_calls([mock.call("started %r", "EvaluatorRunner")]) -def test_evaluator_runner_buffer_limit(mock_evaluator_logs): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=mock.MagicMock()) + +def test_evaluator_runner_buffer_limit(mock_evaluator_logs, active_evaluator_runner): for _ in range(1001): - evaluator_runner.enqueue({}, DUMMY_SPAN) + active_evaluator_runner.enqueue({}, DUMMY_SPAN) mock_evaluator_logs.warning.assert_called_with( "%r event buffer full (limit is %d), dropping event", "EvaluatorRunner", 1000 ) -def test_evaluator_runner_periodic_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) - evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=LLMObs)) - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) - evaluator_runner.periodic() +def test_evaluator_runner_periodic_enqueues_eval_metric(mock_llmobs_eval_metric_writer, active_evaluator_runner): + active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) + active_evaluator_runner.periodic() mock_llmobs_eval_metric_writer.enqueue.assert_called_once_with( _dummy_evaluator_eval_metric_event(span_id="123", trace_id="1234") ) -@pytest.mark.vcr_logs -def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer): - evaluator_runner = EvaluatorRunner(interval=0.01, llmobs_service=LLMObs) - evaluator_runner.evaluators.append(DummyEvaluator(llmobs_service=LLMObs)) +def test_evaluator_runner_stopped_does_not_enqueue_metric(LLMObs, mock_llmobs_eval_metric_writer): + evaluator_runner = EvaluatorRunner(interval=0.1, llmobs_service=LLMObs) evaluator_runner.start() - evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) + assert not evaluator_runner._buffer + assert mock_llmobs_eval_metric_writer.enqueue.call_count == 0 + + +def test_evaluator_runner_timed_enqueues_eval_metric(LLMObs, mock_llmobs_eval_metric_writer, active_evaluator_runner): + active_evaluator_runner.enqueue({"span_id": "123", "trace_id": "1234"}, DUMMY_SPAN) time.sleep(0.1) diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 160023f5df7..bd25d725c54 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -1581,6 +1581,7 @@ def test_llmobs_fork_recreates_and_restarts_eval_metric_writer(): def test_llmobs_fork_recreates_and_restarts_evaluator_runner(mock_ragas_evaluator): """Test that forking a process correctly recreates and restarts the EvaluatorRunner.""" + pytest.importorskip("ragas") with override_env(dict(_DD_LLMOBS_EVALUATORS="ragas_faithfulness")): with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app") @@ -1670,6 +1671,8 @@ def test_llmobs_fork_submit_evaluation(monkeypatch): def test_llmobs_fork_evaluator_runner_run(monkeypatch): """Test that forking a process correctly encodes new spans created in each process.""" monkeypatch.setenv("_DD_LLMOBS_EVALUATOR_INTERVAL", 5.0) + pytest.importorskip("ragas") + monkeypatch.setenv("_DD_LLMOBS_EVALUATORS", "ragas_faithfulness") with mock.patch("ddtrace.llmobs._evaluators.runner.EvaluatorRunner.periodic"): llmobs_service.enable(_tracer=DummyTracer(), ml_app="test_app", api_key="test_api_key") pid = os.fork()