Skip to content

Commit

Permalink
chore(llmobs): migrate to core api
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdinur committed Feb 10, 2025
1 parent 848c921 commit 92f4faa
Show file tree
Hide file tree
Showing 18 changed files with 289 additions and 274 deletions.
3 changes: 2 additions & 1 deletion ddtrace/contrib/internal/openai/_endpoint_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from ddtrace.contrib.internal.openai.utils import _loop_handler
from ddtrace.contrib.internal.openai.utils import _process_finished_stream
from ddtrace.contrib.internal.openai.utils import _tag_tool_calls
from ddtrace.internal import core
from ddtrace.internal.utils.version import parse_version


Expand Down Expand Up @@ -248,7 +249,7 @@ def _record_request(self, pin, integration, instance, span, args, kwargs):
if kwargs.get("stream_options", {}).get("include_usage", None) is not None:
# Only perform token chunk auto-extraction if this option is not explicitly set
return
span._set_ctx_item("_dd.auto_extract_token_chunk", True)
core.set_item("_dd.auto_extract_token_chunk", True)
stream_options = kwargs.get("stream_options", {})
stream_options["include_usage"] = True
kwargs["stream_options"] = stream_options
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/contrib/internal/openai/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import wrapt

from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._utils import _get_attr

Expand Down Expand Up @@ -87,7 +88,7 @@ def __next__(self):

def _extract_token_chunk(self, chunk):
"""Attempt to extract the token chunk (last chunk in the stream) from the streamed response."""
if not self._dd_span._get_ctx_item("_dd.auto_extract_token_chunk"):
if not core.get_item("_dd.auto_extract_token_chunk"):
return
choices = getattr(chunk, "choices")
if not choices:
Expand Down Expand Up @@ -153,7 +154,7 @@ async def __anext__(self):

async def _extract_token_chunk(self, chunk):
"""Attempt to extract the token chunk (last chunk in the stream) from the streamed response."""
if not self._dd_span._get_ctx_item("_dd.auto_extract_token_chunk"):
if not core.get_item("_dd.auto_extract_token_chunk"):
return
choices = getattr(chunk, "choices")
if not choices:
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/llmobs/_evaluators/ragas/answer_relevancy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Tuple
from typing import Union

from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._constants import EVALUATION_SPAN_METADATA
from ddtrace.llmobs._constants import IS_EVALUATION_SPAN
Expand Down Expand Up @@ -85,7 +86,7 @@ def evaluate(self, span_event: dict) -> Tuple[Union[float, str], Optional[dict]]
with self.llmobs_service.workflow(
"dd-ragas.answer_relevancy", ml_app=_get_ml_app_for_ragas_trace(span_event)
) as ragas_ar_workflow:
ragas_ar_workflow._set_ctx_item(IS_EVALUATION_SPAN, True)
core.set_item(IS_EVALUATION_SPAN, True)
try:
evaluation_metadata[EVALUATION_SPAN_METADATA] = self.llmobs_service.export_span(span=ragas_ar_workflow)

Expand Down
3 changes: 2 additions & 1 deletion ddtrace/llmobs/_evaluators/ragas/context_precision.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Tuple
from typing import Union

from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._constants import EVALUATION_KIND_METADATA
from ddtrace.llmobs._constants import EVALUATION_SPAN_METADATA
Expand Down Expand Up @@ -83,7 +84,7 @@ def evaluate(self, span_event: dict) -> Tuple[Union[float, str], Optional[dict]]
with self.llmobs_service.workflow(
"dd-ragas.context_precision", ml_app=_get_ml_app_for_ragas_trace(span_event)
) as ragas_cp_workflow:
ragas_cp_workflow._set_ctx_item(IS_EVALUATION_SPAN, True)
core.set_item(IS_EVALUATION_SPAN, True)
try:
evaluation_metadata[EVALUATION_SPAN_METADATA] = self.llmobs_service.export_span(span=ragas_cp_workflow)

Expand Down
3 changes: 2 additions & 1 deletion ddtrace/llmobs/_evaluators/ragas/faithfulness.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Tuple
from typing import Union

from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._constants import EVALUATION_KIND_METADATA
from ddtrace.llmobs._constants import EVALUATION_SPAN_METADATA
Expand Down Expand Up @@ -97,7 +98,7 @@ def evaluate(self, span_event: dict) -> Tuple[Union[float, str], Optional[dict]]
with self.llmobs_service.workflow(
"dd-ragas.faithfulness", ml_app=_get_ml_app_for_ragas_trace(span_event)
) as ragas_faithfulness_workflow:
ragas_faithfulness_workflow._set_ctx_item(IS_EVALUATION_SPAN, True)
core.set_item(IS_EVALUATION_SPAN, True)
try:
evaluation_metadata[EVALUATION_SPAN_METADATA] = self.llmobs_service.export_span(
span=ragas_faithfulness_workflow
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/llmobs/_integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import List
from typing import Optional

from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import METADATA
Expand Down Expand Up @@ -67,7 +68,7 @@ def _llmobs_set_tags(
if not span.error and response is not None:
output_messages = self._extract_output_message(response)

span._set_ctx_items(
core.set_items(
{
SPAN_KIND: "llm",
MODEL_NAME: span.get_tag("anthropic.request.model") or "",
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from ddtrace.constants import _SPAN_MEASURED_KEY
from ddtrace.contrib.internal.trace_utils import int_service
from ddtrace.ext import SpanTypes
from ddtrace.internal import core
from ddtrace.internal.agent import get_stats_url
from ddtrace.internal.dogstatsd import get_dogstatsd_client
from ddtrace.internal.hostname import get_hostname
Expand Down Expand Up @@ -137,7 +138,7 @@ def trace(self, pin: Pin, operation_id: str, submit_to_llmobs: bool = False, **k
# The LLMObs parent ID tag is not set at span start time. We need to manually set the parent ID tag now
# in these cases to avoid conflicting with the later propagated tags.
parent_id = _get_llmobs_parent_id(span) or "undefined"
span._set_ctx_item(PARENT_ID_KEY, str(parent_id))
core.set_item(PARENT_ID_KEY, str(parent_id))
telemetry_writer.add_count_metric(
namespace=TELEMETRY_NAMESPACE.MLOBS,
name="span.start",
Expand Down
5 changes: 3 additions & 2 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import List
from typing import Optional

from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import METADATA
Expand Down Expand Up @@ -36,7 +37,7 @@ def _llmobs_set_tags(
"""Extract prompt/response tags from a completion and set them as temporary "_ml_obs.*" tags."""
if span.get_tag(PROPAGATED_PARENT_ID_KEY) is None:
parent_id = _get_llmobs_parent_id(span) or "undefined"
span._set_ctx_item(PARENT_ID_KEY, parent_id)
core.set_item(PARENT_ID_KEY, parent_id)
parameters = {}
if span.get_tag("bedrock.request.temperature"):
parameters["temperature"] = float(span.get_tag("bedrock.request.temperature") or 0.0)
Expand All @@ -48,7 +49,7 @@ def _llmobs_set_tags(
output_messages = [{"content": ""}]
if not span.error and response is not None:
output_messages = self._extract_output_message(response)
span._set_ctx_items(
core.set_items(
{
SPAN_KIND: "llm",
MODEL_NAME: span.get_tag("bedrock.request.model") or "",
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/llmobs/_integrations/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import List
from typing import Optional

from ddtrace.internal import core
from ddtrace.internal.utils import get_argument_value
from ddtrace.llmobs._constants import INPUT_MESSAGES
from ddtrace.llmobs._constants import METADATA
Expand Down Expand Up @@ -51,7 +52,7 @@ def _llmobs_set_tags(
if not span.error and response is not None:
output_messages = self._extract_output_message(response)

span._set_ctx_items(
core.set_items(
{
SPAN_KIND: "llm",
MODEL_NAME: span.get_tag("google_generativeai.request.model") or "",
Expand Down
49 changes: 25 additions & 24 deletions ddtrace/llmobs/_integrations/langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Optional
from typing import Union

from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils import ArgumentError
from ddtrace.internal.utils import get_argument_value
Expand Down Expand Up @@ -128,7 +129,7 @@ def _llmobs_set_metadata(self, span: Span, model_provider: Optional[str] = None)
if max_tokens is not None and max_tokens != "None":
metadata["max_tokens"] = int(max_tokens)
if metadata:
span._set_ctx_item(METADATA, metadata)
core.set_item(METADATA, metadata)

def _llmobs_set_tags_from_llm(
self, span: Span, args: List[Any], kwargs: Dict[str, Any], completions: Any, is_workflow: bool = False
Expand All @@ -146,7 +147,7 @@ def _llmobs_set_tags_from_llm(
else:
input_messages = [{"content": str(prompt)} for prompt in prompts]

span._set_ctx_items(
core.set_items(
{
SPAN_KIND: "workflow" if is_workflow else "llm",
MODEL_NAME: span.get_tag(MODEL) or "",
Expand All @@ -156,7 +157,7 @@ def _llmobs_set_tags_from_llm(
)

if span.error:
span._set_ctx_item(output_tag_key, [{"content": ""}])
core.set_item(output_tag_key, [{"content": ""}])
return
if stream:
message_content = [{"content": completions}] # single completion for streams
Expand All @@ -170,8 +171,8 @@ def _llmobs_set_tags_from_llm(
OUTPUT_TOKENS_METRIC_KEY: output_tokens,
TOTAL_TOKENS_METRIC_KEY: total_tokens,
}
span._set_ctx_item(METRICS, metrics)
span._set_ctx_item(output_tag_key, message_content)
core.set_item(METRICS, metrics)
core.set_item(output_tag_key, message_content)

def _llmobs_set_tags_from_chat_model(
self,
Expand All @@ -181,7 +182,7 @@ def _llmobs_set_tags_from_chat_model(
chat_completions: Any,
is_workflow: bool = False,
) -> None:
span._set_ctx_items(
core.set_items(
{
SPAN_KIND: "workflow" if is_workflow else "llm",
MODEL_NAME: span.get_tag(MODEL) or "",
Expand All @@ -207,17 +208,17 @@ def _llmobs_set_tags_from_chat_model(
)
role = getattr(message, "role", ROLE_MAPPING.get(message.type, ""))
input_messages.append({"content": str(content), "role": str(role)})
span._set_ctx_item(input_tag_key, input_messages)
core.set_item(input_tag_key, input_messages)

if span.error:
span._set_ctx_item(output_tag_key, [{"content": ""}])
core.set_item(output_tag_key, [{"content": ""}])
return

output_messages = []
if stream:
content = chat_completions.content
role = chat_completions.__class__.__name__.replace("MessageChunk", "").lower() # AIMessageChunk --> ai
span._set_ctx_item(output_tag_key, [{"content": content, "role": ROLE_MAPPING.get(role, "")}])
core.set_item(output_tag_key, [{"content": content, "role": ROLE_MAPPING.get(role, "")}])
return

input_tokens, output_tokens, total_tokens = 0, 0, 0
Expand Down Expand Up @@ -253,15 +254,15 @@ def _llmobs_set_tags_from_chat_model(
output_tokens = sum(v["output_tokens"] for v in tokens_per_choice_run_id.values())
total_tokens = sum(v["total_tokens"] for v in tokens_per_choice_run_id.values())

span._set_ctx_item(output_tag_key, output_messages)
core.set_item(output_tag_key, output_messages)

if not is_workflow and total_tokens > 0:
metrics = {
INPUT_TOKENS_METRIC_KEY: input_tokens,
OUTPUT_TOKENS_METRIC_KEY: output_tokens,
TOTAL_TOKENS_METRIC_KEY: total_tokens,
}
span._set_ctx_item(METRICS, metrics)
core.set_item(METRICS, metrics)

def _extract_tool_calls(self, chat_completion_msg: Any) -> List[Dict[str, Any]]:
"""Extracts tool calls from a langchain chat completion."""
Expand Down Expand Up @@ -315,7 +316,7 @@ def _llmobs_set_meta_tags_from_chain(self, span: Span, args, kwargs, outputs: An
formatted_outputs = ""
if not span.error and outputs is not None:
formatted_outputs = format_langchain_io(outputs)
span._set_ctx_items({SPAN_KIND: "workflow", INPUT_VALUE: formatted_inputs, OUTPUT_VALUE: formatted_outputs})
core.set_items({SPAN_KIND: "workflow", INPUT_VALUE: formatted_inputs, OUTPUT_VALUE: formatted_outputs})

def _llmobs_set_meta_tags_from_embedding(
self,
Expand All @@ -325,7 +326,7 @@ def _llmobs_set_meta_tags_from_embedding(
output_embedding: Union[List[float], List[List[float]], None],
is_workflow: bool = False,
) -> None:
span._set_ctx_items(
core.set_items(
{
SPAN_KIND: "workflow" if is_workflow else "embedding",
MODEL_NAME: span.get_tag(MODEL) or "",
Expand All @@ -346,16 +347,16 @@ def _llmobs_set_meta_tags_from_embedding(
):
if is_workflow:
formatted_inputs = format_langchain_io(input_texts)
span._set_ctx_item(input_tag_key, formatted_inputs)
core.set_item(input_tag_key, formatted_inputs)
else:
if isinstance(input_texts, str):
input_texts = [input_texts]
input_documents = [Document(text=str(doc)) for doc in input_texts]
span._set_ctx_item(input_tag_key, input_documents)
core.set_item(input_tag_key, input_documents)
except TypeError:
log.warning("Failed to serialize embedding input data to JSON")
if span.error or output_embedding is None:
span._set_ctx_item(output_tag_key, "")
core.set_item(output_tag_key, "")
return
try:
if isinstance(output_embedding[0], float):
Expand All @@ -367,7 +368,7 @@ def _llmobs_set_meta_tags_from_embedding(
output_values = output_embedding
embeddings_count = len(output_embedding)
embedding_dim = len(output_values[0])
span._set_ctx_item(
core.set_item(
output_tag_key,
"[{} embedding(s) returned with size {}]".format(embeddings_count, embedding_dim),
)
Expand All @@ -382,7 +383,7 @@ def _llmobs_set_meta_tags_from_similarity_search(
output_documents: Union[List[Any], None],
is_workflow: bool = False,
) -> None:
span._set_ctx_items(
core.set_items(
{
SPAN_KIND: "workflow" if is_workflow else "retrieval",
MODEL_NAME: span.get_tag(MODEL) or "",
Expand All @@ -392,12 +393,12 @@ def _llmobs_set_meta_tags_from_similarity_search(
input_query = get_argument_value(args, kwargs, 0, "query")
if input_query is not None:
formatted_inputs = format_langchain_io(input_query)
span._set_ctx_item(INPUT_VALUE, formatted_inputs)
core.set_item(INPUT_VALUE, formatted_inputs)
if span.error or not output_documents or not isinstance(output_documents, list):
span._set_ctx_item(OUTPUT_VALUE, "")
core.set_item(OUTPUT_VALUE, "")
return
if is_workflow:
span._set_ctx_item(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(output_documents)))
core.set_item(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(output_documents)))
return
documents = []
for d in output_documents:
Expand All @@ -406,9 +407,9 @@ def _llmobs_set_meta_tags_from_similarity_search(
metadata = getattr(d, "metadata", {})
doc["name"] = metadata.get("name", doc["id"])
documents.append(doc)
span._set_ctx_item(OUTPUT_DOCUMENTS, format_langchain_io(documents))
core.set_item(OUTPUT_DOCUMENTS, format_langchain_io(documents))
# we set the value as well to ensure that the UI would display it in case the span was the root
span._set_ctx_item(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(documents)))
core.set_item(OUTPUT_VALUE, "[{} document(s) retrieved]".format(len(documents)))

def _llmobs_set_meta_tags_from_tool(self, span: Span, tool_inputs: Dict[str, Any], tool_output: object) -> None:
metadata = json.loads(str(span.get_tag(METADATA))) if span.get_tag(METADATA) else {}
Expand All @@ -423,7 +424,7 @@ def _llmobs_set_meta_tags_from_tool(self, span: Span, tool_inputs: Dict[str, Any
formatted_outputs = ""
if not span.error and tool_output is not None:
formatted_outputs = format_langchain_io(tool_output)
span._set_ctx_items(
core.set_items(
{
SPAN_KIND: "tool",
METADATA: metadata,
Expand Down
Loading

0 comments on commit 92f4faa

Please sign in to comment.