From 561d6592ca6d32d344280abf88ea50c704c3fd29 Mon Sep 17 00:00:00 2001 From: Yun Kim Date: Wed, 5 Feb 2025 18:32:19 -0500 Subject: [PATCH] llmobs-specific context manager --- ddtrace/contrib/internal/trace_utils.py | 1 + ddtrace/llmobs/_constants.py | 2 + ddtrace/llmobs/_context.py | 59 +++++ ddtrace/llmobs/_integrations/base.py | 28 +- ddtrace/llmobs/_integrations/bedrock.py | 8 +- ddtrace/llmobs/_integrations/langgraph.py | 5 +- ddtrace/llmobs/_llmobs.py | 120 ++++++--- ddtrace/llmobs/_utils.py | 37 +-- ddtrace/llmobs/_writer.py | 1 + .../feat-llmobs-context-e4adabcb6894e4d8.yaml | 5 + tests/llmobs/_utils.py | 13 + tests/llmobs/test_llmobs.py | 38 ++- tests/llmobs/test_llmobs_service.py | 17 +- tests/llmobs/test_propagation.py | 240 ++++++++---------- 14 files changed, 303 insertions(+), 271 deletions(-) create mode 100644 ddtrace/llmobs/_context.py create mode 100644 releasenotes/notes/feat-llmobs-context-e4adabcb6894e4d8.yaml diff --git a/ddtrace/contrib/internal/trace_utils.py b/ddtrace/contrib/internal/trace_utils.py index 90cf4e44ae0..9eec2818398 100644 --- a/ddtrace/contrib/internal/trace_utils.py +++ b/ddtrace/contrib/internal/trace_utils.py @@ -595,6 +595,7 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None, # We have parsed a trace id from headers, and we do not already # have a context with the same trace id active tracer.context_provider.activate(context) + core.dispatch("http.activate_distributed_headers", (request_headers, context)) def _flatten( diff --git a/ddtrace/llmobs/_constants.py b/ddtrace/llmobs/_constants.py index 1f1fbaa3eee..a47df22654f 100644 --- a/ddtrace/llmobs/_constants.py +++ b/ddtrace/llmobs/_constants.py @@ -45,6 +45,8 @@ DROPPED_IO_COLLECTION_ERROR = "dropped_io" DROPPED_VALUE_TEXT = "[This value has been dropped because this span's size exceeds the 1MB size limit.]" +ROOT_PARENT_ID = "undefined" + # Set for traces of evaluator integrations e.g. `runner.integration:ragas`. # Used to differentiate traces of Datadog-run operations vs user-application operations. RUNNER_IS_INTEGRATION_SPAN_TAG = "runner.integration" diff --git a/ddtrace/llmobs/_context.py b/ddtrace/llmobs/_context.py new file mode 100644 index 00000000000..c6496954af5 --- /dev/null +++ b/ddtrace/llmobs/_context.py @@ -0,0 +1,59 @@ +import contextvars +from typing import Optional +from typing import Union + +from ddtrace._trace.context import Context +from ddtrace._trace.provider import DefaultContextProvider +from ddtrace._trace.span import Span +from ddtrace.ext import SpanTypes + + +ContextTypeValue = Optional[Union[Context, Span]] + + +_DD_LLMOBS_CONTEXTVAR: contextvars.ContextVar[ContextTypeValue] = contextvars.ContextVar( + "datadog_llmobs_contextvar", + default=None, +) + + +class LLMObsContextProvider(DefaultContextProvider): + """Context provider that retrieves contexts from a context variable. + It is suitable for synchronous programming and for asynchronous executors + that support contextvars. + """ + + def __init__(self) -> None: + super(DefaultContextProvider, self).__init__() + _DD_LLMOBS_CONTEXTVAR.set(None) + + def _has_active_context(self) -> bool: + """Returns whether there is an active context in the current execution.""" + ctx = _DD_LLMOBS_CONTEXTVAR.get() + return ctx is not None + + def _update_active(self, span: Span) -> Optional[Span]: + """Updates the active LLMObs span. + The active span is updated to be the span's closest unfinished LLMObs ancestor span. + """ + if not span.finished: + return span + new_active: Optional[Span] = span + while new_active and new_active.finished: + new_active = new_active._parent + if new_active and not new_active.finished and new_active.span_type == SpanTypes.LLM: + break + self.activate(new_active) + return new_active + + def activate(self, ctx: ContextTypeValue) -> None: + """Makes the given context active in the current execution.""" + _DD_LLMOBS_CONTEXTVAR.set(ctx) + super(DefaultContextProvider, self).activate(ctx) + + def active(self) -> ContextTypeValue: + """Returns the active span or context for the current execution.""" + item = _DD_LLMOBS_CONTEXTVAR.get() + if isinstance(item, Span): + return self._update_active(item) + return item diff --git a/ddtrace/llmobs/_integrations/base.py b/ddtrace/llmobs/_integrations/base.py index a098c899014..ab77d4a52ca 100644 --- a/ddtrace/llmobs/_integrations/base.py +++ b/ddtrace/llmobs/_integrations/base.py @@ -18,11 +18,8 @@ from ddtrace.internal.telemetry import telemetry_writer from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE from ddtrace.internal.utils.formats import asbool -from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY from ddtrace.llmobs._llmobs import LLMObs from ddtrace.llmobs._log_writer import V2LogWriter -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.settings import IntegrationConfig from ddtrace.trace import Pin from ddtrace.trace import Span @@ -132,21 +129,16 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k span.set_tag(_SPAN_MEASURED_KEY) self._set_base_span_tags(span, **kwargs) if submit_to_llmobs and self.llmobs_enabled: - if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None: - # For non-distributed traces or spans in the first service of a distributed trace, - # The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now - # in these cases to avoid conflicting with the later propagated tags. - parent_id = _get_llmobs_parent_id(span) or "undefined" - span._set_ctx_item(PARENT_ID_KEY, str(parent_id)) - telemetry_writer.add_count_metric( - namespace=TELEMETRY_NAMESPACE.MLOBS, - name="span.start", - value=1, - tags=( - ("integration", self._integration_name), - ("autoinstrumented", "true"), - ), - ) + LLMObs._instance._activate_llmobs_span(span) + telemetry_writer.add_count_metric( + namespace=TELEMETRY_NAMESPACE.MLOBS, + name="span.start", + value=1, + tags=( + ("integration", self._integration_name), + ("autoinstrumented", "true"), + ), + ) return span @classmethod diff --git a/ddtrace/llmobs/_integrations/bedrock.py b/ddtrace/llmobs/_integrations/bedrock.py index cbc1456fc24..2db9de790dd 100644 --- a/ddtrace/llmobs/_integrations/bedrock.py +++ b/ddtrace/llmobs/_integrations/bedrock.py @@ -4,18 +4,16 @@ from typing import Optional from ddtrace.internal.logger import get_logger +from ddtrace.llmobs import LLMObs from ddtrace.llmobs._constants import INPUT_MESSAGES from ddtrace.llmobs._constants import METADATA from ddtrace.llmobs._constants import METRICS from ddtrace.llmobs._constants import MODEL_NAME from ddtrace.llmobs._constants import MODEL_PROVIDER from ddtrace.llmobs._constants import OUTPUT_MESSAGES -from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._integrations import BaseLLMIntegration from ddtrace.llmobs._integrations.utils import get_llmobs_metrics_tags -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.trace import Span @@ -34,9 +32,7 @@ def _llmobs_set_tags( operation: str = "", ) -> None: """Extract prompt/response tags from a completion and set them as temporary "_ml_obs.*" tags.""" - if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None: - parent_id = _get_llmobs_parent_id(span) or "undefined" - span._set_ctx_item(PARENT_ID_KEY, parent_id) + LLMObs._instance._activate_llmobs_span(span) parameters = {} if span.get_tag("bedrock.request.temperature"): parameters["temperature"] = float(span.get_tag("bedrock.request.temperature") or 0.0) diff --git a/ddtrace/llmobs/_integrations/langgraph.py b/ddtrace/llmobs/_integrations/langgraph.py index 08f3943e57e..1139c5faf29 100644 --- a/ddtrace/llmobs/_integrations/langgraph.py +++ b/ddtrace/llmobs/_integrations/langgraph.py @@ -8,12 +8,13 @@ from ddtrace.llmobs._constants import INPUT_VALUE from ddtrace.llmobs._constants import NAME from ddtrace.llmobs._constants import OUTPUT_VALUE +from ddtrace.llmobs._constants import PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_LINKS from ddtrace.llmobs._integrations.base import BaseLLMIntegration from ddtrace.llmobs._integrations.utils import format_langchain_io from ddtrace.llmobs._utils import _get_attr -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor from ddtrace.trace import Span from ddtrace.trace import tracer @@ -154,7 +155,7 @@ def _default_span_link(span: Span): the span is linked to its parent's input. """ return { - "span_id": str(_get_llmobs_parent_id(span)) or "undefined", + "span_id": span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID, "trace_id": "{:x}".format(span.trace_id), "attributes": {"from": "input", "to": "input"}, } diff --git a/ddtrace/llmobs/_llmobs.py b/ddtrace/llmobs/_llmobs.py index 14e12d3151e..fc4b273605c 100644 --- a/ddtrace/llmobs/_llmobs.py +++ b/ddtrace/llmobs/_llmobs.py @@ -10,6 +10,9 @@ import ddtrace from ddtrace import config from ddtrace import patch +from ddtrace._trace.context import Context +from ddtrace._trace.span import Span +from ddtrace._trace.tracer import Tracer from ddtrace.constants import ERROR_MSG from ddtrace.constants import ERROR_STACK from ddtrace.constants import ERROR_TYPE @@ -45,18 +48,18 @@ from ddtrace.llmobs._constants import OUTPUT_VALUE from ddtrace.llmobs._constants import PARENT_ID_KEY from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_LINKS from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING from ddtrace.llmobs._constants import TAGS +from ddtrace.llmobs._context import LLMObsContextProvider from ddtrace.llmobs._evaluators.runner import EvaluatorRunner from ddtrace.llmobs._utils import AnnotationContext -from ddtrace.llmobs._utils import _get_llmobs_parent_id from ddtrace.llmobs._utils import _get_ml_app from ddtrace.llmobs._utils import _get_session_id from ddtrace.llmobs._utils import _get_span_name -from ddtrace.llmobs._utils import _inject_llmobs_parent_id from ddtrace.llmobs._utils import _is_evaluation_span from ddtrace.llmobs._utils import safe_json from ddtrace.llmobs._utils import validate_prompt @@ -66,8 +69,6 @@ from ddtrace.llmobs.utils import ExportedLLMObsSpan from ddtrace.llmobs.utils import Messages from ddtrace.propagation.http import HTTPPropagator -from ddtrace.trace import Context -from ddtrace.trace import Span log = get_logger(__name__) @@ -90,6 +91,7 @@ class LLMObs(Service): def __init__(self, tracer=None): super(LLMObs, self).__init__() self.tracer = tracer or ddtrace.tracer + self._llmobs_context_provider = LLMObsContextProvider() self._llmobs_span_writer = LLMObsSpanWriter( is_agentless=config._llmobs_agentless_enabled, agentless_url="%s.%s" % (AGENTLESS_BASE_URL, config._dd_site), @@ -183,7 +185,7 @@ def _llmobs_span_event(cls, span: Span) -> Dict[str, Any]: ml_app = _get_ml_app(span) span._set_ctx_item(ML_APP, ml_app) - parent_id = str(_get_llmobs_parent_id(span) or "undefined") + parent_id = span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID llmobs_span_event = { "trace_id": "{:x}".format(span.trace_id), @@ -195,6 +197,7 @@ def _llmobs_span_event(cls, span: Span) -> Dict[str, Any]: "status": "error" if span.error else "ok", "meta": meta, "metrics": metrics, + "_dd": {"span_id": str(span.span_id), "trace_id": "{:x}".format(span.trace_id)}, } session_id = _get_session_id(span) if session_id is not None: @@ -283,6 +286,7 @@ def _stop_service(self) -> None: core.reset_listeners("trace.span_start", self._on_span_start) core.reset_listeners("trace.span_finish", self._on_span_finish) core.reset_listeners("http.span_inject", self._inject_llmobs_context) + core.reset_listeners("http.activate_distributed_headers", self._activate_llmobs_distributed_context) forksafe.unregister(self._child_after_fork) @@ -296,7 +300,7 @@ def enable( api_key: Optional[str] = None, env: Optional[str] = None, service: Optional[str] = None, - _tracer: Optional[ddtrace.trace.Tracer] = None, + _tracer: Optional[Tracer] = None, ) -> None: """ Enable LLM Observability tracing. @@ -364,7 +368,8 @@ def enable( # Register hooks for span events core.on("trace.span_start", cls._instance._on_span_start) core.on("trace.span_finish", cls._instance._on_span_finish) - core.on("http.span_inject", cls._instance._inject_llmobs_context) + core.on("http.span_inject", cls._inject_llmobs_context) + core.on("http.activate_distributed_headers", cls._activate_llmobs_distributed_context) atexit.register(cls.disable) telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, True) @@ -490,23 +495,45 @@ def export_span(cls, span: Optional[Span] = None) -> Optional[ExportedLLMObsSpan """Returns a simple representation of a span to export its span and trace IDs. If no span is provided, the current active LLMObs-type span will be used. """ - if span: - try: - if span.span_type != SpanTypes.LLM: - log.warning("Span must be an LLMObs-generated span.") - return None - return ExportedLLMObsSpan(span_id=str(span.span_id), trace_id="{:x}".format(span.trace_id)) - except (TypeError, AttributeError): - log.warning("Failed to export span. Span must be a valid Span object.") - return None - span = cls._instance.tracer.current_span() if span is None: - log.warning("No span provided and no active LLMObs-generated span found.") - return None - if span.span_type != SpanTypes.LLM: - log.warning("Span must be an LLMObs-generated span.") + span = cls._instance._current_span() + if span is None: + log.warning("No span provided and no active LLMObs-generated span found.") + return None + try: + if span.span_type != SpanTypes.LLM: + log.warning("Span must be an LLMObs-generated span.") + return None + return ExportedLLMObsSpan(span_id=str(span.span_id), trace_id="{:x}".format(span.trace_id)) + except (TypeError, AttributeError): + log.warning("Failed to export span. Span must be a valid Span object.") return None - return ExportedLLMObsSpan(span_id=str(span.span_id), trace_id="{:x}".format(span.trace_id)) + + def _current_span(self) -> Optional[Span]: + """Returns the currently active LLMObs-generated span. + Note that there may be an active span represented by a context object + (i.e. a distributed trace) which will not be returned by this method. + """ + active = self._llmobs_context_provider.active() + return active if isinstance(active, Span) else None + + def _current_trace_context(self) -> Optional[Context]: + """Returns the context for the current LLMObs trace.""" + active = self._llmobs_context_provider.active() + if isinstance(active, Context): + return active + elif isinstance(active, Span): + return active.context + return None + + def _activate_llmobs_span(self, span: Span) -> None: + """Propagate the llmobs parent span's ID as the new span's parent ID and activate the new span.""" + llmobs_parent = self._llmobs_context_provider.active() + if llmobs_parent: + span._set_ctx_item(PARENT_ID_KEY, str(llmobs_parent.span_id)) + else: + span._set_ctx_item(PARENT_ID_KEY, ROOT_PARENT_ID) + self._llmobs_context_provider.activate(span) def _start_span( self, @@ -529,6 +556,7 @@ def _start_span( if name is None: name = operation_kind span = self.tracer.trace(name, resource=operation_kind, span_type=SpanTypes.LLM) + self._activate_llmobs_span(span) span._set_ctx_item(SPAN_KIND, operation_kind) if model_name is not None: span._set_ctx_item(MODEL_NAME, model_name) @@ -540,12 +568,6 @@ def _start_span( if ml_app is None: ml_app = _get_ml_app(span) span._set_ctx_item(ML_APP, ml_app) - if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None: - # For non-distributed traces or spans in the first service of a distributed trace, - # The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now - # in these cases to avoid conflicting with the later propagated tags. - parent_id = _get_llmobs_parent_id(span) or "undefined" - span._set_ctx_item(PARENT_ID_KEY, str(parent_id)) return span @classmethod @@ -750,7 +772,7 @@ def annotate( such as `{prompt,completion,total}_tokens`. """ if span is None: - span = cls._instance.tracer.current_span() + span = cls._instance._current_span() if span is None: log.warning("No span provided and no active LLMObs-generated span found.") return @@ -1119,10 +1141,16 @@ def submit_evaluation( cls._instance._llmobs_eval_metric_writer.enqueue(evaluation_metric) - def _inject_llmobs_context(self, span_context: Context, request_headers: Dict[str, str]) -> None: - if self.enabled is False: + @classmethod + def _inject_llmobs_context(cls, span_context: Context, request_headers: Dict[str, str]) -> None: + if cls.enabled is False: return - _inject_llmobs_parent_id(span_context) + active_context = cls._instance._current_trace_context() + if active_context is None: + parent_id = ROOT_PARENT_ID + else: + parent_id = str(active_context.span_id) + span_context._meta[PROPAGATED_PARENT_ID_KEY] = parent_id @classmethod def inject_distributed_headers(cls, request_headers: Dict[str, str], span: Optional[Span] = None) -> Dict[str, str]: @@ -1141,9 +1169,31 @@ def inject_distributed_headers(cls, request_headers: Dict[str, str], span: Optio if span is None: log.warning("No span provided and no currently active span found.") return request_headers + if not isinstance(span, Span): + log.warning("span must be a valid Span object. Distributed context will not be injected.") + return request_headers HTTPPropagator.inject(span.context, request_headers) return request_headers + @classmethod + def _activate_llmobs_distributed_context(cls, request_headers: Dict[str, str], context: Context) -> None: + if cls.enabled is False: + return + if not context.trace_id or not context.span_id: + log.warning("Failed to extract trace/span ID from request headers.") + return + _parent_id = context._meta.get(PROPAGATED_PARENT_ID_KEY) + if _parent_id is None: + log.warning("Failed to extract LLMObs parent ID from request headers.") + return + try: + parent_id = int(_parent_id) + except ValueError: + log.warning("Failed to parse LLMObs parent ID from request headers.") + return + llmobs_context = Context(trace_id=context.trace_id, span_id=parent_id) + cls._instance._llmobs_context_provider.activate(llmobs_context) + @classmethod def activate_distributed_headers(cls, request_headers: Dict[str, str]) -> None: """ @@ -1158,12 +1208,8 @@ def activate_distributed_headers(cls, request_headers: Dict[str, str]) -> None: ) return context = HTTPPropagator.extract(request_headers) - if context.trace_id is None or context.span_id is None: - log.warning("Failed to extract trace ID or span ID from request headers.") - return - if PROPAGATED_PARENT_ID_KEY not in context._meta: - log.warning("Failed to extract LLMObs parent ID from request headers.") cls._instance.tracer.context_provider.activate(context) + cls._instance._activate_llmobs_distributed_context(request_headers, context) # initialize the default llmobs instance diff --git a/ddtrace/llmobs/_utils.py b/ddtrace/llmobs/_utils.py index 8861820002c..49d6d93b3f8 100644 --- a/ddtrace/llmobs/_utils.py +++ b/ddtrace/llmobs/_utils.py @@ -4,7 +4,6 @@ from typing import Optional from typing import Union -import ddtrace from ddtrace import config from ddtrace.ext import SpanTypes from ddtrace.internal.logger import get_logger @@ -16,8 +15,6 @@ from ddtrace.llmobs._constants import ML_APP from ddtrace.llmobs._constants import NAME from ddtrace.llmobs._constants import OPENAI_APM_SPAN_NAME -from ddtrace.llmobs._constants import PARENT_ID_KEY -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import VERTEXAI_APM_SPAN_NAME from ddtrace.trace import Span @@ -108,18 +105,6 @@ def _get_nearest_llmobs_ancestor(span: Span) -> Optional[Span]: return None -def _get_llmobs_parent_id(span: Span) -> Optional[str]: - """Return the span ID of the nearest LLMObs-type span in the span's ancestor tree. - In priority order: manually set parent ID tag, nearest LLMObs ancestor, local root's propagated parent ID tag. - """ - if span._get_ctx_item(PARENT_ID_KEY): - return span._get_ctx_item(PARENT_ID_KEY) - nearest_llmobs_ancestor = _get_nearest_llmobs_ancestor(span) - if nearest_llmobs_ancestor: - return str(nearest_llmobs_ancestor.span_id) - return span.get_tag(PROPAGATED_PARENT_ID_KEY) - - def _get_span_name(span: Span) -> str: if span.name in (LANGCHAIN_APM_SPAN_NAME, GEMINI_APM_SPAN_NAME, VERTEXAI_APM_SPAN_NAME) and span.resource != "": return span.resource @@ -164,10 +149,7 @@ def _get_ml_app(span: Span) -> str: def _get_session_id(span: Span) -> Optional[str]: - """ - Return the session ID for a given span, by checking the span's nearest LLMObs span ancestor. - Default to the span's trace ID. - """ + """Return the session ID for a given span, by checking the span's nearest LLMObs span ancestor.""" session_id = span._get_ctx_item(SESSION_ID) if session_id: return session_id @@ -180,23 +162,6 @@ def _get_session_id(span: Span) -> Optional[str]: return session_id -def _inject_llmobs_parent_id(span_context): - """Inject the LLMObs parent ID into the span context for reconnecting distributed LLMObs traces.""" - span = ddtrace.tracer.current_span() - if span is None: - log.warning("No active span to inject LLMObs parent ID info.") - return - if span.context is not span_context: - log.warning("The current active span and span_context do not match. Not injecting LLMObs parent ID.") - return - - if span.span_type == SpanTypes.LLM: - llmobs_parent_id = str(span.span_id) - else: - llmobs_parent_id = _get_llmobs_parent_id(span) - span_context._meta[PROPAGATED_PARENT_ID_KEY] = llmobs_parent_id or "undefined" - - def _unserializable_default_repr(obj): default_repr = "[Unserializable object: {}]".format(repr(obj)) log.warning("I/O object is not JSON serializable. Defaulting to placeholder value instead.") diff --git a/ddtrace/llmobs/_writer.py b/ddtrace/llmobs/_writer.py index c172c9adba9..39879786d6f 100644 --- a/ddtrace/llmobs/_writer.py +++ b/ddtrace/llmobs/_writer.py @@ -51,6 +51,7 @@ class LLMObsSpanEvent(TypedDict): meta: Dict[str, Any] metrics: Dict[str, Any] collection_errors: List[str] + _dd: Dict[str, str] class LLMObsEvaluationMetricEvent(TypedDict, total=False): diff --git a/releasenotes/notes/feat-llmobs-context-e4adabcb6894e4d8.yaml b/releasenotes/notes/feat-llmobs-context-e4adabcb6894e4d8.yaml new file mode 100644 index 00000000000..cfea978fa1e --- /dev/null +++ b/releasenotes/notes/feat-llmobs-context-e4adabcb6894e4d8.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + LLM Observability: Introduces an improved automated context management system for LLM Observability-specific spans. + Also modifies ``LLMObs.export_span()``, ``LLMObs.inject_distributed_headers()``, ``LLMObs.annotate()`` to default to the current active LLM Observability-specific span if ``span`` is not provided. diff --git a/tests/llmobs/_utils.py b/tests/llmobs/_utils.py index 3bbc4ffcef2..72021ec2136 100644 --- a/tests/llmobs/_utils.py +++ b/tests/llmobs/_utils.py @@ -181,6 +181,7 @@ def _llmobs_base_span_event( "meta": {"span.kind": span_kind}, "metrics": {}, "tags": _expected_llmobs_tags(span, tags=tags, error=error, session_id=session_id), + "_dd": {"span_id": str(span.span_id), "trace_id": "{:x}".format(span.trace_id)}, } if session_id: span_event["session_id"] = session_id @@ -568,6 +569,7 @@ def _expected_ragas_context_precision_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -585,6 +587,7 @@ def _expected_ragas_context_precision_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, ] @@ -612,6 +615,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -629,6 +633,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -646,6 +651,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -658,6 +664,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "meta": {"span.kind": "task", "metadata": {}}, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -675,6 +682,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -687,6 +695,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): "meta": {"span.kind": "task", "metadata": {}}, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -703,6 +712,7 @@ def _expected_ragas_faithfulness_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, ] @@ -727,6 +737,7 @@ def _expected_ragas_answer_relevancy_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -744,6 +755,7 @@ def _expected_ragas_answer_relevancy_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, { "trace_id": mock.ANY, @@ -761,5 +773,6 @@ def _expected_ragas_answer_relevancy_spans(ragas_inputs=None): }, "metrics": {}, "tags": expected_ragas_trace_tags(), + "_dd": {"span_id": mock.ANY, "trace_id": mock.ANY}, }, ] diff --git a/tests/llmobs/test_llmobs.py b/tests/llmobs/test_llmobs.py index 086e867c787..26826f2001d 100644 --- a/tests/llmobs/test_llmobs.py +++ b/tests/llmobs/test_llmobs.py @@ -2,7 +2,8 @@ from ddtrace.ext import SpanTypes from ddtrace.llmobs import _constants as const -from ddtrace.llmobs._utils import _get_llmobs_parent_id +from ddtrace.llmobs._constants import PARENT_ID_KEY +from ddtrace.llmobs._constants import ROOT_PARENT_ID from ddtrace.llmobs._utils import _get_session_id from tests.llmobs._utils import _expected_llmobs_llm_span_event @@ -41,19 +42,18 @@ def test_propagates_ignore_non_llmobs_spans(self, tracer, llmobs_events): assert "ml_app:test-ml-app" in llmobs_event["tags"] -def test_set_correct_parent_id(tracer): +def test_set_correct_parent_id(llmobs): """Test that the parent_id is set as the span_id of the nearest LLMObs span in the span's ancestor tree.""" - with tracer.trace("root"): - with tracer.trace("llm_span", span_type=SpanTypes.LLM) as llm_span: + with llmobs._instance.tracer.trace("root"): + with llmobs.workflow("llm_span") as llm_span: pass - assert _get_llmobs_parent_id(llm_span) is None - with tracer.trace("root_llm_span", span_type=SpanTypes.LLM) as root_span: - with tracer.trace("child_span") as child_span: - with tracer.trace("llm_span", span_type=SpanTypes.LLM) as grandchild_span: - pass - assert _get_llmobs_parent_id(root_span) is None - assert _get_llmobs_parent_id(child_span) == str(root_span.span_id) - assert _get_llmobs_parent_id(grandchild_span) == str(root_span.span_id) + assert llm_span._get_ctx_item(PARENT_ID_KEY) is ROOT_PARENT_ID + with llmobs.workflow("root_llm_span") as root_span: + assert root_span._get_ctx_item(PARENT_ID_KEY) is ROOT_PARENT_ID + with llmobs._instance.tracer.trace("child_span") as child_span: + assert child_span._get_ctx_item(PARENT_ID_KEY) is None + with llmobs.task("llm_span") as grandchild_span: + assert grandchild_span._get_ctx_item(PARENT_ID_KEY) == str(root_span.span_id) class TestSessionId: @@ -216,8 +216,7 @@ def test_model_and_provider_are_set(tracer, llmobs_events): def test_malformed_span_logs_error_instead_of_raising(tracer, llmobs_events, mock_llmobs_logs): """Test that a trying to create a span event from a malformed span will log an error instead of crashing.""" with tracer.trace("root_llm_span", span_type=SpanTypes.LLM) as llm_span: - # span does not have SPAN_KIND tag - pass + pass # span does not have SPAN_KIND tag mock_llmobs_logs.error.assert_called_with( "Error generating LLMObs span event for span %s, likely due to malformed span", llm_span, exc_info=True ) @@ -229,14 +228,9 @@ def test_only_generate_span_events_from_llmobs_spans(tracer, llmobs_events): with tracer.trace("root_llm_span", service="tests.llmobs", span_type=SpanTypes.LLM) as root_span: root_span._set_ctx_item(const.SPAN_KIND, "llm") with tracer.trace("child_span"): - with tracer.trace("llm_span", span_type=SpanTypes.LLM) as grandchild_span: - grandchild_span._set_ctx_item(const.SPAN_KIND, "llm") - expected_grandchild_llmobs_span = _expected_llmobs_llm_span_event(grandchild_span, "llm") - expected_grandchild_llmobs_span["parent_id"] = str(root_span.span_id) - - assert len(llmobs_events) == 2 - assert llmobs_events[1] == _expected_llmobs_llm_span_event(root_span, "llm") - assert llmobs_events[0] == expected_grandchild_llmobs_span + pass + assert len(llmobs_events) == 1 + assert llmobs_events[0] == _expected_llmobs_llm_span_event(root_span, "llm") def test_utf_non_ascii_io(llmobs, llmobs_backend): diff --git a/tests/llmobs/test_llmobs_service.py b/tests/llmobs/test_llmobs_service.py index 2fe3e1fbfab..2c1834ae910 100644 --- a/tests/llmobs/test_llmobs_service.py +++ b/tests/llmobs/test_llmobs_service.py @@ -22,7 +22,6 @@ from ddtrace.llmobs._constants import OUTPUT_DOCUMENTS from ddtrace.llmobs._constants import OUTPUT_MESSAGES from ddtrace.llmobs._constants import OUTPUT_VALUE -from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY from ddtrace.llmobs._constants import SESSION_ID from ddtrace.llmobs._constants import SPAN_KIND from ddtrace.llmobs._constants import SPAN_START_WHILE_DISABLED_WARNING @@ -831,7 +830,7 @@ def test_export_span_no_specified_span_no_active_span_raises_warning(llmobs, moc def test_export_span_active_span_not_llmobs_span_raises_warning(llmobs, mock_llmobs_logs): with llmobs._instance.tracer.trace("non_llmobs_span"): llmobs.export_span() - mock_llmobs_logs.warning.assert_called_once_with("Span must be an LLMObs-generated span.") + mock_llmobs_logs.warning.assert_called_once_with("No span provided and no active LLMObs-generated span found.") def test_export_span_no_specified_span_returns_exported_active_span(llmobs): @@ -1245,7 +1244,7 @@ def test_inject_distributed_headers_span_calls_httppropagator_inject(llmobs, moc def test_inject_distributed_headers_current_active_span_injected(llmobs, mock_llmobs_logs): - span = llmobs._instance.tracer.trace("test_span") + span = llmobs.workflow("test_span") with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.inject") as mock_inject: llmobs.inject_distributed_headers({}, span=None) assert mock_inject.call_count == 1 @@ -1270,23 +1269,23 @@ def test_activate_distributed_headers_calls_httppropagator_extract(llmobs, mock_ def test_activate_distributed_headers_no_trace_id_does_nothing(llmobs, mock_llmobs_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - mock_extract.return_value = Context(span_id="123", meta={PROPAGATED_PARENT_ID_KEY: "123"}) + mock_extract.return_value = Context(span_id=123) llmobs.activate_distributed_headers({}) assert mock_extract.call_count == 1 - mock_llmobs_logs.warning.assert_called_once_with("Failed to extract trace ID or span ID from request headers.") + mock_llmobs_logs.warning.assert_called_once_with("Failed to extract trace/span ID from request headers.") def test_activate_distributed_headers_no_span_id_does_nothing(llmobs, mock_llmobs_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - mock_extract.return_value = Context(trace_id="123", meta={PROPAGATED_PARENT_ID_KEY: "123"}) + mock_extract.return_value = Context(trace_id=123) llmobs.activate_distributed_headers({}) assert mock_extract.call_count == 1 - mock_llmobs_logs.warning.assert_called_once_with("Failed to extract trace ID or span ID from request headers.") + mock_llmobs_logs.warning.assert_called_once_with("Failed to extract trace/span ID from request headers.") def test_activate_distributed_headers_no_llmobs_parent_id_does_nothing(llmobs, mock_llmobs_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - dummy_context = Context(trace_id="123", span_id="456") + dummy_context = Context(trace_id=123, span_id=456) mock_extract.return_value = dummy_context with mock.patch("ddtrace.llmobs.LLMObs._instance.tracer.context_provider.activate") as mock_activate: llmobs.activate_distributed_headers({}) @@ -1297,7 +1296,7 @@ def test_activate_distributed_headers_no_llmobs_parent_id_does_nothing(llmobs, m def test_activate_distributed_headers_activates_context(llmobs, mock_llmobs_logs): with mock.patch("ddtrace.llmobs._llmobs.HTTPPropagator.extract") as mock_extract: - dummy_context = Context(trace_id="123", span_id="456", meta={PROPAGATED_PARENT_ID_KEY: "789"}) + dummy_context = Context(trace_id=123, span_id=456) mock_extract.return_value = dummy_context with mock.patch("ddtrace.llmobs.LLMObs._instance.tracer.context_provider.activate") as mock_activate: llmobs.activate_distributed_headers({}) diff --git a/tests/llmobs/test_propagation.py b/tests/llmobs/test_propagation.py index 2a2d8a92695..7a8c52f9f94 100644 --- a/tests/llmobs/test_propagation.py +++ b/tests/llmobs/test_propagation.py @@ -1,55 +1,47 @@ import json import os -from ddtrace.ext import SpanTypes +from ddtrace.llmobs._constants import PARENT_ID_KEY from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY -from ddtrace.llmobs._utils import _get_llmobs_parent_id -from ddtrace.llmobs._utils import _inject_llmobs_parent_id -from ddtrace.propagation.http import HTTPPropagator -from tests.utils import DummyTracer +from ddtrace.llmobs._constants import ROOT_PARENT_ID -def test_inject_llmobs_parent_id_no_llmobs_span(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("Non-LLMObs span"): - with dummy_tracer.trace("Non-LLMObs span") as child_span: - _inject_llmobs_parent_id(child_span.context) - assert child_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == "undefined" +def test_inject_llmobs_parent_id_no_llmobs_span(llmobs): + with llmobs._instance.tracer.trace("Non-LLMObs span"): + with llmobs._instance.tracer.trace("Non-LLMObs span") as span: + llmobs._inject_llmobs_context(span.context, {}) + assert span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == ROOT_PARENT_ID -def test_inject_llmobs_parent_id_simple(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span: - _inject_llmobs_parent_id(root_span.context) - assert root_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(root_span.span_id) +def test_inject_llmobs_parent_id_simple(llmobs): + with llmobs.workflow("LLMObs span") as span: + llmobs._inject_llmobs_context(span.context, {}) + assert span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(span.span_id) -def test_inject_llmobs_parent_id_nested_llmobs_non_llmobs(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span: - with dummy_tracer.trace("Non-LLMObs span") as child_span: - _inject_llmobs_parent_id(child_span.context) - assert child_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(root_span.span_id) +def test_inject_llmobs_parent_id_nested_llmobs_non_llmobs(llmobs): + with llmobs.workflow("LLMObs span") as root_span: + with llmobs._instance.tracer.trace("Non-LLMObs span") as span: + llmobs._inject_llmobs_context(span.context, {}) + assert span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(root_span.span_id) -def test_inject_llmobs_parent_id_non_llmobs_root_span(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("Non-LLMObs span"): - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as child_span: - _inject_llmobs_parent_id(child_span.context) - assert child_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(child_span.span_id) +def test_inject_llmobs_parent_id_non_llmobs_root_span(llmobs): + with llmobs._instance.tracer.trace("Non-LLMObs span"): + with llmobs.workflow("LLMObs span") as span: + llmobs._inject_llmobs_context(span.context, {}) + assert span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(span.span_id) -def test_inject_llmobs_parent_id_nested_llmobs_spans(): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("LLMObs child span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("Last LLMObs child span", span_type=SpanTypes.LLM) as last_llmobs_span: - _inject_llmobs_parent_id(last_llmobs_span.context) +def test_inject_llmobs_parent_id_nested_llmobs_spans(llmobs): + with llmobs.workflow("LLMObs span"): + with llmobs.workflow("LLMObs child span"): + with llmobs.workflow("Last LLMObs child span") as last_llmobs_span: + llmobs._inject_llmobs_context(last_llmobs_span.context, {}) assert last_llmobs_span.context._meta.get(PROPAGATED_PARENT_ID_KEY) == str(last_llmobs_span.span_id) -def test_propagate_correct_llmobs_parent_id_simple(run_python_code_in_subprocess): +def test_propagate_correct_llmobs_parent_id_simple(ddtrace_run_python_code_in_subprocess, llmobs): """Test that the correct LLMObs parent ID is propagated in the headers in a simple distributed scenario. Service A (subprocess) has a root LLMObs span and a non-LLMObs child span. Service B (outside subprocess) has a LLMObs span. @@ -57,39 +49,31 @@ def test_propagate_correct_llmobs_parent_id_simple(run_python_code_in_subprocess """ code = """ import json -import mock -from ddtrace.internal.utils.http import Response +from ddtrace import tracer from ddtrace.llmobs import LLMObs from ddtrace.propagation.http import HTTPPropagator -with mock.patch( - "ddtrace.internal.writer.HTTPWriter._send_payload", return_value=Response(status=200, body="{}"), -): - LLMObs.enable(ml_app="test-app", api_key="", agentless_enabled=True) - with LLMObs.workflow("LLMObs span") as root_span: - with LLMObs._instance.tracer.trace("Non-LLMObs span") as child_span: - headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} - HTTPPropagator.inject(child_span.context, headers) +with LLMObs.workflow("LLMObs span") as root_span: + with tracer.trace("Non-LLMObs span"): + headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} + LLMObs.inject_distributed_headers(headers) print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) - assert stderr == b"", (stdout, stderr) headers = json.loads(stdout.decode()) - context = HTTPPropagator.extract(headers) - dummy_tracer = DummyTracer() - dummy_tracer.context_provider.activate(context) - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as span: + llmobs.activate_distributed_headers(headers) + with llmobs.workflow("LLMObs span") as span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == headers["_DD_LLMOBS_SPAN_ID"] + assert span._get_ctx_item(PARENT_ID_KEY) == headers["_DD_LLMOBS_SPAN_ID"] -def test_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess): +def test_propagate_llmobs_parent_id_complex(ddtrace_run_python_code_in_subprocess, llmobs): """Test that the correct LLMObs parent ID is propagated in the headers in a more complex trace. Service A (subprocess) has a root LLMObs span and a non-LLMObs child span. Service B (outside subprocess) has a non-LLMObs local root span and a LLMObs child span. @@ -97,50 +81,34 @@ def test_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess): """ code = """ import json -import mock -from ddtrace.internal.utils.http import Response +from ddtrace import tracer from ddtrace.llmobs import LLMObs from ddtrace.propagation.http import HTTPPropagator -with mock.patch( - "ddtrace.internal.writer.HTTPWriter._send_payload", return_value=Response(status=200, body="{}"), -): - from ddtrace import auto # simulate ddtrace-run startup to ensure env var configs also propagate - with LLMObs.workflow("LLMObs span") as root_span: - with LLMObs._instance.tracer.trace("Non-LLMObs span") as child_span: - headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} - HTTPPropagator.inject(child_span.context, headers) +with LLMObs.workflow("LLMObs span") as root_span: + with tracer.trace("Non-LLMObs span"): + headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} + LLMObs.inject_distributed_headers(headers) print(json.dumps(headers)) """ env = os.environ.copy() - env.update( - { - "DD_LLMOBS_ENABLED": "1", - "DD_TRACE_ENABLED": "0", - "DD_AGENTLESS_ENABLED": "1", - "DD_API_KEY": "", - "DD_LLMOBS_ML_APP": "test-app", - } - ) - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) - assert stderr == b"", (stdout, stderr) headers = json.loads(stdout.decode()) - context = HTTPPropagator.extract(headers) - dummy_tracer = DummyTracer() - dummy_tracer.context_provider.activate(context) - with dummy_tracer.trace("Non-LLMObs span") as span: - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as llm_span: + llmobs.activate_distributed_headers(headers) + with llmobs._instance.tracer.trace("Non-LLMObs span") as span: + with llmobs.llm(model_name="llm_model", name="LLMObs span") as llm_span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == headers["_DD_LLMOBS_SPAN_ID"] - assert _get_llmobs_parent_id(llm_span) == headers["_DD_LLMOBS_SPAN_ID"] + assert span._get_ctx_item(PARENT_ID_KEY) is None + assert llm_span._get_ctx_item(PARENT_ID_KEY) == headers["_DD_LLMOBS_SPAN_ID"] -def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(run_python_code_in_subprocess): - """Test that the correct LLMObs parent ID ('undefined') is extracted from headers in a simple distributed scenario. +def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(ddtrace_run_python_code_in_subprocess, llmobs): + """Test that the correct LLMObs parent ID (None) is extracted from the headers in a simple distributed scenario. Service A (subprocess) has spans, but none are LLMObs spans. Service B (outside subprocess) has a LLMObs span. Service B's span should have no LLMObs parent ID as there are no LLMObs spans from service A. @@ -148,64 +116,60 @@ def test_no_llmobs_parent_id_propagated_if_no_llmobs_spans(run_python_code_in_su code = """ import json +from ddtrace import tracer from ddtrace.llmobs import LLMObs from ddtrace.propagation.http import HTTPPropagator -LLMObs.enable(ml_app="ml-app", agentless_enabled=True, api_key="") -with LLMObs._instance.tracer.trace("Non-LLMObs span") as root_span: +with tracer.trace("Non-LLMObs span") as span: headers = {} - HTTPPropagator.inject(root_span.context, headers) + LLMObs.inject_distributed_headers(headers) + HTTPPropagator.inject(span.context, headers) print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) - assert stderr == b"", (stdout, stderr) headers = json.loads(stdout.decode()) - context = HTTPPropagator.extract(headers) - dummy_tracer = DummyTracer() - dummy_tracer.context_provider.activate(context) - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as span: - assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == "undefined" + llmobs.activate_distributed_headers(headers) + with llmobs.workflow("LLMObs span") as span: + assert str(span.parent_id) == headers.get("x-datadog-parent-id") + assert span._get_ctx_item(PARENT_ID_KEY) == ROOT_PARENT_ID def test_inject_distributed_headers_simple(llmobs): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as root_span: + with llmobs.workflow("LLMObs span") as root_span: request_headers = llmobs.inject_distributed_headers({}, span=root_span) - assert PROPAGATED_PARENT_ID_KEY in request_headers["x-datadog-tags"] + assert "llmobs_parent_id:{}".format(root_span.span_id) in request_headers.get("tracestate") def test_inject_distributed_headers_nested_llmobs_non_llmobs(llmobs): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("Non-LLMObs span") as child_span: + with llmobs.workflow("LLMObs span") as root_span: + with llmobs._instance.tracer.trace("Non-LLMObs span") as child_span: request_headers = llmobs.inject_distributed_headers({}, span=child_span) - assert PROPAGATED_PARENT_ID_KEY in request_headers["x-datadog-tags"] + assert "llmobs_parent_id:{}".format(root_span.span_id) in request_headers.get("tracestate") def test_inject_distributed_headers_non_llmobs_root_span(llmobs): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("Non-LLMObs span"): - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM) as child_span: + with llmobs._instance.tracer.trace("Non-LLMObs span"): + with llmobs.workflow("LLMObs span") as child_span: request_headers = llmobs.inject_distributed_headers({}, span=child_span) - assert PROPAGATED_PARENT_ID_KEY in request_headers["x-datadog-tags"] + assert "llmobs_parent_id:{}".format(child_span.span_id) in request_headers.get("tracestate") def test_inject_distributed_headers_nested_llmobs_spans(llmobs): - dummy_tracer = DummyTracer() - with dummy_tracer.trace("LLMObs span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("LLMObs child span", span_type=SpanTypes.LLM): - with dummy_tracer.trace("Last LLMObs child span", span_type=SpanTypes.LLM) as last_llmobs_span: + with llmobs.workflow("LLMObs span"): + with llmobs.workflow("LLMObs child span"): + with llmobs.workflow("LLMObs grandchild span") as last_llmobs_span: request_headers = llmobs.inject_distributed_headers({}, span=last_llmobs_span) - assert PROPAGATED_PARENT_ID_KEY in request_headers["x-datadog-tags"] + assert "llmobs_parent_id:{}".format(last_llmobs_span.span_id) in request_headers.get("tracestate") -def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple(run_python_code_in_subprocess, llmobs): +def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple( + ddtrace_run_python_code_in_subprocess, llmobs +): """Test that the correct LLMObs parent ID is propagated in the headers in a simple distributed scenario. Service A (subprocess) has a root LLMObs span and a non-LLMObs child span. Service B (outside subprocess) has a LLMObs span. @@ -214,11 +178,10 @@ def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple( code = """ import json -from ddtrace.trace import tracer -from ddtrace.ext import SpanTypes +from ddtrace import tracer from ddtrace.llmobs import LLMObs -LLMObs.enable(ml_app="test-app", api_key="") +LLMObs.enable(ml_app="test-app", site="datad0g.com", api_key="dummy-key", agentless_enabled=True) with LLMObs.workflow("LLMObs span") as root_span: with tracer.trace("Non-LLMObs span") as child_span: @@ -228,57 +191,55 @@ def test_activate_distributed_headers_propagate_correct_llmobs_parent_id_simple( print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_TRACE_ENABLED": "0"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) headers = json.loads(stdout.decode()) llmobs.activate_distributed_headers(headers) with llmobs.workflow("LLMObs span") as span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == headers["_DD_LLMOBS_SPAN_ID"] + assert span._get_ctx_item(PARENT_ID_KEY) == headers["_DD_LLMOBS_SPAN_ID"] -def test_activate_distributed_headers_propagate_llmobs_parent_id_complex(run_python_code_in_subprocess, llmobs): +def test_activate_distributed_headers_propagate_llmobs_parent_id_complex(ddtrace_run_python_code_in_subprocess, llmobs): """Test that the correct LLMObs parent ID is propagated in the headers in a more complex trace. Service A (subprocess) has a root LLMObs span and a non-LLMObs child span. Service B (outside subprocess) has a non-LLMObs local root span and a LLMObs child span. - Both of service B's spans should have the same LLMObs parent ID (Root LLMObs span from service A). + Service B's LLMObs span should have the LLMObs parent ID (Root LLMObs span from service A). """ code = """ import json -from ddtrace.trace import tracer -from ddtrace.ext import SpanTypes +from ddtrace import tracer from ddtrace.llmobs import LLMObs -LLMObs.enable(ml_app="test-app", api_key="") +LLMObs.enable(ml_app="test-app", site="datad0g.com", api_key="dummy-key", agentless_enabled=True) with LLMObs.workflow("LLMObs span") as root_span: with tracer.trace("Non-LLMObs span") as child_span: headers = {"_DD_LLMOBS_SPAN_ID": str(root_span.span_id)} - headers = LLMObs.inject_distributed_headers(headers, span=child_span) + headers = LLMObs.inject_distributed_headers(headers) print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_TRACE_ENABLED": "0"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) headers = json.loads(stdout.decode()) llmobs.activate_distributed_headers(headers) - dummy_tracer = DummyTracer() - with dummy_tracer.trace("Non-LLMObs span") as span: + with llmobs._instance.tracer.trace("Non-LLMObs span") as span: with llmobs.llm(model_name="llm_model", name="LLMObs span") as llm_span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == headers["_DD_LLMOBS_SPAN_ID"] - assert _get_llmobs_parent_id(llm_span) == headers["_DD_LLMOBS_SPAN_ID"] + assert span._get_ctx_item(PARENT_ID_KEY) is None + assert llm_span._get_ctx_item(PARENT_ID_KEY) == headers["_DD_LLMOBS_SPAN_ID"] -def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans(run_python_code_in_subprocess, llmobs): +def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans( + ddtrace_run_python_code_in_subprocess, llmobs +): """Test that the correct LLMObs parent ID (None) is extracted from the headers in a simple distributed scenario. Service A (subprocess) has spans, but none are LLMObs spans. Service B (outside subprocess) has a LLMObs span. @@ -287,11 +248,9 @@ def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans(run_ code = """ import json -from ddtrace.trace import tracer +from ddtrace import tracer from ddtrace.llmobs import LLMObs -LLMObs.enable(ml_app="test-app", api_key="") - with tracer.trace("Non-LLMObs span") as root_span: headers = {} headers = LLMObs.inject_distributed_headers(headers, span=root_span) @@ -299,13 +258,12 @@ def test_activate_distributed_headers_does_not_propagate_if_no_llmobs_spans(run_ print(json.dumps(headers)) """ env = os.environ.copy() - env["DD_LLMOBS_ENABLED"] = "1" - env["DD_TRACE_ENABLED"] = "0" - stdout, stderr, status, _ = run_python_code_in_subprocess(code=code, env=env) + env.update({"DD_LLMOBS_ML_APP": "test-app", "DD_LLMOBS_ENABLED": "1", "DD_TRACE_ENABLED": "0"}) + stdout, stderr, status, _ = ddtrace_run_python_code_in_subprocess(code=code, env=env) assert status == 0, (stdout, stderr) headers = json.loads(stdout.decode()) llmobs.activate_distributed_headers(headers) with llmobs.task("LLMObs span") as span: assert str(span.parent_id) == headers["x-datadog-parent-id"] - assert _get_llmobs_parent_id(span) == "undefined" + assert span._get_ctx_item(PARENT_ID_KEY) == ROOT_PARENT_ID