Skip to content

Commit

Permalink
llmobs-specific context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Yun-Kim committed Feb 6, 2025
1 parent 25e7e2e commit 561d659
Show file tree
Hide file tree
Showing 14 changed files with 303 additions and 271 deletions.
1 change: 1 addition & 0 deletions ddtrace/contrib/internal/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None,
# We have parsed a trace id from headers, and we do not already
# have a context with the same trace id active
tracer.context_provider.activate(context)
core.dispatch("http.activate_distributed_headers", (request_headers, context))


def _flatten(
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/llmobs/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
DROPPED_IO_COLLECTION_ERROR = "dropped_io"
DROPPED_VALUE_TEXT = "[This value has been dropped because this span's size exceeds the 1MB size limit.]"

ROOT_PARENT_ID = "undefined"

# Set for traces of evaluator integrations e.g. `runner.integration:ragas`.
# Used to differentiate traces of Datadog-run operations vs user-application operations.
RUNNER_IS_INTEGRATION_SPAN_TAG = "runner.integration"
Expand Down
59 changes: 59 additions & 0 deletions ddtrace/llmobs/_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import contextvars
from typing import Optional
from typing import Union

from ddtrace._trace.context import Context
from ddtrace._trace.provider import DefaultContextProvider
from ddtrace._trace.span import Span
from ddtrace.ext import SpanTypes


ContextTypeValue = Optional[Union[Context, Span]]


_DD_LLMOBS_CONTEXTVAR: contextvars.ContextVar[ContextTypeValue] = contextvars.ContextVar(
"datadog_llmobs_contextvar",
default=None,
)


class LLMObsContextProvider(DefaultContextProvider):
"""Context provider that retrieves contexts from a context variable.
It is suitable for synchronous programming and for asynchronous executors
that support contextvars.
"""

def __init__(self) -> None:
super(DefaultContextProvider, self).__init__()
_DD_LLMOBS_CONTEXTVAR.set(None)

def _has_active_context(self) -> bool:
"""Returns whether there is an active context in the current execution."""
ctx = _DD_LLMOBS_CONTEXTVAR.get()
return ctx is not None

def _update_active(self, span: Span) -> Optional[Span]:
"""Updates the active LLMObs span.
The active span is updated to be the span's closest unfinished LLMObs ancestor span.
"""
if not span.finished:
return span
new_active: Optional[Span] = span
while new_active and new_active.finished:
new_active = new_active._parent
if new_active and not new_active.finished and new_active.span_type == SpanTypes.LLM:
break
self.activate(new_active)
return new_active

def activate(self, ctx: ContextTypeValue) -> None:
"""Makes the given context active in the current execution."""
_DD_LLMOBS_CONTEXTVAR.set(ctx)
super(DefaultContextProvider, self).activate(ctx)

def active(self) -> ContextTypeValue:
"""Returns the active span or context for the current execution."""
item = _DD_LLMOBS_CONTEXTVAR.get()
if isinstance(item, Span):
return self._update_active(item)
return item
28 changes: 10 additions & 18 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
from ddtrace.internal.telemetry import telemetry_writer
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
from ddtrace.internal.utils.formats import asbool
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
from ddtrace.llmobs._llmobs import LLMObs
from ddtrace.llmobs._log_writer import V2LogWriter
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.settings import IntegrationConfig
from ddtrace.trace import Pin
from ddtrace.trace import Span
Expand Down Expand Up @@ -132,21 +129,16 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k
span.set_tag(_SPAN_MEASURED_KEY)
self._set_base_span_tags(span, **kwargs)
if submit_to_llmobs and self.llmobs_enabled:
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
# For non-distributed traces or spans in the first service of a distributed trace,
# The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now
# in these cases to avoid conflicting with the later propagated tags.
parent_id = _get_llmobs_parent_id(span) or "undefined"
span._set_ctx_item(PARENT_ID_KEY, str(parent_id))
telemetry_writer.add_count_metric(
namespace=TELEMETRY_NAMESPACE.MLOBS,
name="span.start",
value=1,
tags=(
("integration", self._integration_name),
("autoinstrumented", "true"),
),
)
LLMObs._instance._activate_llmobs_span(span)
telemetry_writer.add_count_metric(
namespace=TELEMETRY_NAMESPACE.MLOBS,
name="span.start",
value=1,
tags=(
("integration", self._integration_name),
("autoinstrumented", "true"),
),
)
return span

@classmethod
Expand Down
8 changes: 2 additions & 6 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@
from typing import Optional

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import METADATA
from ddtrace.llmobs._constants import METRICS
from ddtrace.llmobs._constants import MODEL_NAME
from ddtrace.llmobs._constants import MODEL_PROVIDER
from ddtrace.llmobs._constants import OUTPUT_MESSAGES
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import PROPAGATED_PARENT_ID_KEY
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._integrations import BaseLLMIntegration
from ddtrace.llmobs._integrations.utils import get_llmobs_metrics_tags
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.trace import Span


Expand All @@ -34,9 +32,7 @@ def _llmobs_set_tags(
operation: str = "",
) -> None:
"""Extract prompt/response tags from a completion and set them as temporary "_ml_obs.*" tags."""
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
parent_id = _get_llmobs_parent_id(span) or "undefined"
span._set_ctx_item(PARENT_ID_KEY, parent_id)
LLMObs._instance._activate_llmobs_span(span)
parameters = {}
if span.get_tag("bedrock.request.temperature"):
parameters["temperature"] = float(span.get_tag("bedrock.request.temperature") or 0.0)
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/llmobs/_integrations/langgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
from ddtrace.llmobs._constants import INPUT_VALUE
from ddtrace.llmobs._constants import NAME
from ddtrace.llmobs._constants import OUTPUT_VALUE
from ddtrace.llmobs._constants import PARENT_ID_KEY
from ddtrace.llmobs._constants import ROOT_PARENT_ID
from ddtrace.llmobs._constants import SPAN_KIND
from ddtrace.llmobs._constants import SPAN_LINKS
from ddtrace.llmobs._integrations.base import BaseLLMIntegration
from ddtrace.llmobs._integrations.utils import format_langchain_io
from ddtrace.llmobs._utils import _get_attr
from ddtrace.llmobs._utils import _get_llmobs_parent_id
from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor
from ddtrace.trace import Span
from ddtrace.trace import tracer
Expand Down Expand Up @@ -154,7 +155,7 @@ def _default_span_link(span: Span):
the span is linked to its parent's input.
"""
return {
"span_id": str(_get_llmobs_parent_id(span)) or "undefined",
"span_id": span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID,
"trace_id": "{:x}".format(span.trace_id),
"attributes": {"from": "input", "to": "input"},
}
Expand Down
Loading

0 comments on commit 561d659

Please sign in to comment.