diff --git a/aws-opentelemetry-distro/pyproject.toml b/aws-opentelemetry-distro/pyproject.toml index d4f9ca204..cc738c435 100644 --- a/aws-opentelemetry-distro/pyproject.toml +++ b/aws-opentelemetry-distro/pyproject.toml @@ -25,61 +25,61 @@ classifiers = [ ] dependencies = [ - "opentelemetry-api == 1.27.0", - "opentelemetry-sdk == 1.27.0", - "opentelemetry-exporter-otlp-proto-grpc == 1.27.0", - "opentelemetry-exporter-otlp-proto-http == 1.27.0", - "opentelemetry-propagator-b3 == 1.27.0", - "opentelemetry-propagator-jaeger == 1.27.0", - "opentelemetry-exporter-otlp-proto-common == 1.27.0", - "opentelemetry-sdk-extension-aws == 2.0.2", - "opentelemetry-propagator-aws-xray == 1.0.1", - "opentelemetry-distro == 0.48b0", - "opentelemetry-propagator-ot-trace == 0.48b0", - "opentelemetry-instrumentation == 0.48b0", - "opentelemetry-instrumentation-aws-lambda == 0.48b0", - "opentelemetry-instrumentation-aio-pika == 0.48b0", - "opentelemetry-instrumentation-aiohttp-client == 0.48b0", - "opentelemetry-instrumentation-aiopg == 0.48b0", - "opentelemetry-instrumentation-asgi == 0.48b0", - "opentelemetry-instrumentation-asyncpg == 0.48b0", - "opentelemetry-instrumentation-boto == 0.48b0", - "opentelemetry-instrumentation-boto3sqs == 0.48b0", - "opentelemetry-instrumentation-botocore == 0.48b0", - "opentelemetry-instrumentation-celery == 0.48b0", - "opentelemetry-instrumentation-confluent-kafka == 0.48b0", - "opentelemetry-instrumentation-dbapi == 0.48b0", - "opentelemetry-instrumentation-django == 0.48b0", - "opentelemetry-instrumentation-elasticsearch == 0.48b0", - "opentelemetry-instrumentation-falcon == 0.48b0", - "opentelemetry-instrumentation-fastapi == 0.48b0", - "opentelemetry-instrumentation-flask == 0.48b0", - "opentelemetry-instrumentation-grpc == 0.48b0", - "opentelemetry-instrumentation-httpx == 0.48b0", - "opentelemetry-instrumentation-jinja2 == 0.48b0", - "opentelemetry-instrumentation-kafka-python == 0.48b0", - "opentelemetry-instrumentation-logging == 0.48b0", - "opentelemetry-instrumentation-mysql == 0.48b0", - "opentelemetry-instrumentation-mysqlclient == 0.48b0", - "opentelemetry-instrumentation-pika == 0.48b0", - "opentelemetry-instrumentation-psycopg2 == 0.48b0", - "opentelemetry-instrumentation-pymemcache == 0.48b0", - "opentelemetry-instrumentation-pymongo == 0.48b0", - "opentelemetry-instrumentation-pymysql == 0.48b0", - "opentelemetry-instrumentation-pyramid == 0.48b0", - "opentelemetry-instrumentation-redis == 0.48b0", - "opentelemetry-instrumentation-remoulade == 0.48b0", - "opentelemetry-instrumentation-requests == 0.48b0", - "opentelemetry-instrumentation-sqlalchemy == 0.48b0", - "opentelemetry-instrumentation-sqlite3 == 0.48b0", - "opentelemetry-instrumentation-starlette == 0.48b0", - "opentelemetry-instrumentation-system-metrics == 0.48b0", - "opentelemetry-instrumentation-tornado == 0.48b0", - "opentelemetry-instrumentation-tortoiseorm == 0.48b0", - "opentelemetry-instrumentation-urllib == 0.48b0", - "opentelemetry-instrumentation-urllib3 == 0.48b0", - "opentelemetry-instrumentation-wsgi == 0.48b0", - "opentelemetry-instrumentation-cassandra == 0.48b0", + "opentelemetry-api >= 1.29.0", + "opentelemetry-sdk >= 1.29.0", + "opentelemetry-exporter-otlp-proto-grpc >= 1.29.0", + "opentelemetry-exporter-otlp-proto-http >= 1.29.0", + "opentelemetry-propagator-b3 >= 1.29.0", + "opentelemetry-propagator-jaeger >= 1.29.0", + "opentelemetry-exporter-otlp-proto-common >= 1.29.0", + "opentelemetry-sdk-extension-aws >= 2.0.2", + "opentelemetry-propagator-aws-xray >= 1.0.1", + "opentelemetry-distro >= 0.50b0", + "opentelemetry-propagator-ot-trace >= 0.50b0", + "opentelemetry-instrumentation >= 0.50b0", + "opentelemetry-instrumentation-aws-lambda >= 0.50b0", + "opentelemetry-instrumentation-aio-pika >= 0.50b0", + "opentelemetry-instrumentation-aiohttp-client >= 0.50b0", + "opentelemetry-instrumentation-aiopg >= 0.50b0", + "opentelemetry-instrumentation-asgi >= 0.50b0", + "opentelemetry-instrumentation-asyncpg >= 0.50b0", + "opentelemetry-instrumentation-boto >= 0.50b0", + "opentelemetry-instrumentation-boto3sqs >= 0.50b0", + "opentelemetry-instrumentation-botocore >= 0.50b0", + "opentelemetry-instrumentation-celery >= 0.50b0", + "opentelemetry-instrumentation-confluent-kafka >= 0.50b0", + "opentelemetry-instrumentation-dbapi >= 0.50b0", + "opentelemetry-instrumentation-django >= 0.50b0", + "opentelemetry-instrumentation-elasticsearch >= 0.50b0", + "opentelemetry-instrumentation-falcon >= 0.50b0", + "opentelemetry-instrumentation-fastapi >= 0.50b0", + "opentelemetry-instrumentation-flask >= 0.50b0", + "opentelemetry-instrumentation-grpc >= 0.50b0", + "opentelemetry-instrumentation-httpx >= 0.50b0", + "opentelemetry-instrumentation-jinja2 >= 0.50b0", + "opentelemetry-instrumentation-kafka-python >= 0.50b0", + "opentelemetry-instrumentation-logging >= 0.50b0", + "opentelemetry-instrumentation-mysql >= 0.50b0", + "opentelemetry-instrumentation-mysqlclient >= 0.50b0", + "opentelemetry-instrumentation-pika >= 0.50b0", + "opentelemetry-instrumentation-psycopg2 >= 0.50b0", + "opentelemetry-instrumentation-pymemcache >= 0.50b0", + "opentelemetry-instrumentation-pymongo >= 0.50b0", + "opentelemetry-instrumentation-pymysql >= 0.50b0", + "opentelemetry-instrumentation-pyramid >= 0.50b0", + "opentelemetry-instrumentation-redis >= 0.50b0", + "opentelemetry-instrumentation-remoulade >= 0.50b0", + "opentelemetry-instrumentation-requests >= 0.50b0", + "opentelemetry-instrumentation-sqlalchemy >= 0.50b0", + "opentelemetry-instrumentation-sqlite3 >= 0.50b0", + "opentelemetry-instrumentation-starlette >= 0.50b0", + "opentelemetry-instrumentation-system-metrics >= 0.50b0", + "opentelemetry-instrumentation-tornado >= 0.50b0", + "opentelemetry-instrumentation-tortoiseorm >= 0.50b0", + "opentelemetry-instrumentation-urllib >= 0.50b0", + "opentelemetry-instrumentation-urllib3 >= 0.50b0", + "opentelemetry-instrumentation-wsgi >= 0.50b0", + "opentelemetry-instrumentation-cassandra >= 0.50b0", ] [project.optional-dependencies] diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py index 847f50fb1..011a1d19d 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py @@ -1,6 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 +import os import sys from logging import Logger, getLogger @@ -8,6 +9,7 @@ _logger: Logger = getLogger(__name__) +AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED" def is_installed(req: str) -> bool: """Is the given required package installed?""" @@ -21,3 +23,6 @@ def is_installed(req: str) -> bool: _logger.debug("Skipping instrumentation patch: package %s, exception: %s", req, exc) return False return True + +def is_agent_observability_enabled() -> bool: + return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true" diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py index e4be93d99..b2a695536 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py @@ -10,6 +10,7 @@ from typing_extensions import override from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import ( @@ -27,7 +28,7 @@ from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView -from opentelemetry._logs import set_logger_provider +from opentelemetry._logs import set_logger_provider, get_logger_provider from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter @@ -91,6 +92,8 @@ OTEL_EXPORTER_OTLP_LOGS_ENDPOINT = "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT" OTEL_EXPORTER_OTLP_LOGS_HEADERS = "OTEL_EXPORTER_OTLP_LOGS_HEADERS" +AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED" + AWS_TRACES_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$" AWS_LOGS_OTLP_ENDPOINT_PATTERN = r"https://logs\.([a-z0-9-]+)\.amazonaws\.com/v1/logs$" @@ -160,6 +163,10 @@ def _initialize_components(): sampler_name = _get_sampler() sampler = _custom_import_sampler(sampler_name, resource) + logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false") + if logging_enabled.strip().lower() == "true": + _init_logging(log_exporters, resource) + _init_tracing( exporters=trace_exporters, id_generator=id_generator, @@ -167,9 +174,6 @@ def _initialize_components(): resource=resource, ) _init_metrics(metric_exporters, resource) - logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false") - if logging_enabled.strip().lower() == "true": - _init_logging(log_exporters, resource) def _init_logging( @@ -359,7 +363,15 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) -> _logger.info("Detected using AWS OTLP Traces Endpoint.") if isinstance(span_exporter, OTLPSpanExporter): - span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) + if is_agent_observability_enabled(): + logs_endpoint = os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT) + logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint) + span_exporter = OTLPAwsSpanExporter( + endpoint=traces_endpoint, + logger_provider=get_logger_provider() + ) + else: + span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint) else: _logger.warning( diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py index 5fd5d744d..7defb5d47 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/traces/otlp_aws_span_exporter.py @@ -1,11 +1,18 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 -from typing import Dict, Optional +from typing import Dict, Optional, Sequence from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession +from amazon.opentelemetry.distro.llo_handler import LLOHandler +from amazon.opentelemetry.distro._utils import is_agent_observability_enabled +from opentelemetry.sdk._logs import LoggerProvider from opentelemetry.exporter.otlp.proto.http import Compression from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.sdk.trace.export import SpanExportResult + +AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED" class OTLPAwsSpanExporter(OTLPSpanExporter): @@ -18,9 +25,13 @@ def __init__( headers: Optional[Dict[str, str]] = None, timeout: Optional[int] = None, compression: Optional[Compression] = None, + logger_provider: Optional[LoggerProvider] = None ): self._aws_region = None + if logger_provider: + self._llo_handler = LLOHandler(logger_provider) + if endpoint: self._aws_region = endpoint.split(".")[1] @@ -35,3 +46,10 @@ def __init__( compression, session=AwsAuthSession(aws_region=self._aws_region, service="xray"), ) + + def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: + if is_agent_observability_enabled(): + llo_processed_spans = self._llo_handler.process_spans(spans) + return super().export(llo_processed_spans) + + return super().export(spans) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py new file mode 100644 index 000000000..4c3706d96 --- /dev/null +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/llo_handler.py @@ -0,0 +1,681 @@ +import logging +import re + +from typing import Any, Dict, List, Optional, Sequence + +from opentelemetry.attributes import BoundedAttributes +from opentelemetry._events import Event +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._events import EventLoggerProvider +from opentelemetry.sdk.trace import ReadableSpan, Event as SpanEvent + +# Message event types +GEN_AI_SYSTEM_MESSAGE = "gen_ai.system.message" +GEN_AI_USER_MESSAGE = "gen_ai.user.message" +GEN_AI_ASSISTANT_MESSAGE = "gen_ai.assistant.message" + +# Framework-specific attribute keys +TRACELOOP_ENTITY_INPUT = "traceloop.entity.input" +TRACELOOP_ENTITY_OUTPUT = "traceloop.entity.output" +OPENINFERENCE_INPUT_VALUE = "input.value" +OPENINFERENCE_OUTPUT_VALUE = "output.value" +OPENLIT_PROMPT = "gen_ai.prompt" +OPENLIT_COMPLETION = "gen_ai.completion" +OPENLIT_REVISED_PROMPT = "gen_ai.content.revised_prompt" + +# Roles +ROLE_SYSTEM = "system" +ROLE_USER = "user" +ROLE_ASSISTANT = "assistant" + +_logger = logging.getLogger(__name__) + + +class LLOHandler: + """ + Utility class for handling Large Language Objects (LLO) in OpenTelemetry spans. + + LLOHandler performs three primary functions: + 1. Identifies Large Language Objects (LLO) content in spans + 2. Extracts and transforms these attributes into OpenTelemetry Gen AI Events + 3. Filters LLO from spans to maintain privacy and reduce span size + + Supported frameworks and their attribute patterns: + - Standard Gen AI: + - gen_ai.prompt.{n}.content: Structured prompt content + - gen_ai.prompt.{n}.role: Role for prompt content (system, user, assistant, etc.) + - gen_ai.completion.{n}.content: Structured completion content + - gen_ai.completion.{n}.role: Role for completion content (usually assistant) + + - Traceloop: + - traceloop.entity.input: Input text for LLM operations + - traceloop.entity.output: Output text from LLM operations + - traceloop.entity.name: Name of the entity processing the LLO + + - OpenLit: + - gen_ai.prompt: Direct prompt text (treated as user message) + - gen_ai.completion: Direct completion text (treated as assistant message) + - gen_ai.content.revised_prompt: Revised prompt text (treated as system message) + + - OpenInference: + - input.value: Direct input prompt + - output.value: Direct output response + - llm.input_messages.{n}.message.content: Individual structured input messages + - llm.input_messages.{n}.message.role: Role for input messages + - llm.output_messages.{n}.message.content: Individual structured output messages + - llm.output_messages.{n}.message.role: Role for output messages + - llm.model_name: Model name used for the LLM operation + """ + + def __init__(self, logger_provider: LoggerProvider): + """ + Initialize an LLOHandler with the specified logger provider. + + This constructor sets up the event logger provider, configures the event logger, + and initializes the patterns used to identify LLO attributes. + + Args: + logger_provider: The OpenTelemetry LoggerProvider used for emitting events. + Global LoggerProvider instance injected from our AwsOpenTelemetryConfigurator + """ + self._logger_provider = logger_provider + + self._event_logger_provider = EventLoggerProvider(logger_provider=self._logger_provider) + self._event_logger = self._event_logger_provider.get_event_logger("gen_ai.events") + + # Patterns for attribute filtering + self._exact_match_patterns = [ + TRACELOOP_ENTITY_INPUT, + TRACELOOP_ENTITY_OUTPUT, + OPENLIT_PROMPT, + OPENLIT_COMPLETION, + OPENLIT_REVISED_PROMPT, + OPENINFERENCE_INPUT_VALUE, + OPENINFERENCE_OUTPUT_VALUE, + ] + + # Pre-compile regex patterns for better performance + self._regex_patterns = [ + re.compile(r"^gen_ai\.prompt\.\d+\.content$"), + re.compile(r"^gen_ai\.completion\.\d+\.content$"), + re.compile(r"^llm\.input_messages\.\d+\.message\.content$"), + re.compile(r"^llm\.output_messages\.\d+\.message\.content$"), + ] + + # Additional pre-compiled patterns used in extraction methods + self._prompt_content_pattern = re.compile(r"^gen_ai\.prompt\.(\d+)\.content$") + self._completion_content_pattern = re.compile(r"^gen_ai\.completion\.(\d+)\.content$") + self._openinference_input_msg_pattern = re.compile(r"^llm\.input_messages\.(\d+)\.message\.content$") + self._openinference_output_msg_pattern = re.compile(r"^llm\.output_messages\.(\d+)\.message\.content$") + + def process_spans(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]: + """ + Processes a sequence of spans to extract and filter LLO attributes. + + For each span, this method: + 1. Extracts LLO attributes and emits them as Gen AI Events + 2. Filters out LLO attributes from the span to maintain privacy + 3. Processes any LLO attributes in span events + 4. Preserves non-LLO attributes in the span + + Handles LLO attributes from multiple frameworks: + - Standard Gen AI (structured prompt/completion pattern) + - Traceloop (entity input/output pattern) + - OpenLit (direct prompt/completion pattern) + - OpenInference (input/output value and structured messages pattern) + + Args: + spans: A sequence of OpenTelemetry ReadableSpan objects to process + + Returns: + List[ReadableSpan]: Modified spans with LLO attributes removed + """ + modified_spans = [] + + for span in spans: + self._emit_llo_attributes(span, span.attributes) + updated_attributes = self._filter_attributes(span.attributes) + + if isinstance(span.attributes, BoundedAttributes): + span._attributes = BoundedAttributes( + maxlen=span.attributes.maxlen, + attributes=updated_attributes, + immutable=span.attributes._immutable, + max_value_len=span.attributes.max_value_len, + ) + else: + span._attributes = updated_attributes + + self.process_span_events(span) + + modified_spans.append(span) + + return modified_spans + + def process_span_events(self, span: ReadableSpan) -> None: + """ + Process events within a span to extract and filter LLO attributes. + + For each event in the span, this method: + 1. Emits LLO attributes found in event attributes as Gen AI Events + 2. Filters out LLO attributes from event attributes + 3. Creates updated events with filtered attributes + 4. Replaces the original span events with updated events + + This ensures that LLO attributes are properly handled even when they appear + in span events rather than directly in the span's attributes. + + Args: + span: The ReadableSpan to process events for + + Returns: + None: The span is modified in-place + """ + if not span.events: + return + + updated_events = [] + + for event in span.events: + if not event.attributes: + updated_events.append(event) + continue + + self._emit_llo_attributes(span, event.attributes, event_timestamp=event.timestamp) + + updated_event_attributes = self._filter_attributes(event.attributes) + + if len(updated_event_attributes) != len(event.attributes): + limit = None + if isinstance(event.attributes, BoundedAttributes): + limit = event.attributes.maxlen + + updated_event = SpanEvent( + name=event.name, attributes=updated_event_attributes, timestamp=event.timestamp, limit=limit + ) + + updated_events.append(updated_event) + else: + updated_events.append(event) + + span._events = updated_events + + def _emit_llo_attributes( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> None: + """ + Extract Gen AI Events from LLO attributes and emit them via the event logger. + + This method: + 1. Collects LLO attributes from multiple frameworks using specialized extractors + 2. Converts each LLO attribute into appropriate Gen AI Events + 3. Emits all collected events through the event logger + + Supported frameworks: + - Standard Gen AI: Structured prompt/completion with roles + - Traceloop: Entity input/output + - OpenLit: Direct prompt/completion/revised prompt + - OpenInference: Direct values and structured messages + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + None: Events are emitted via the event logger + """ + all_events = [] + all_events.extend(self._extract_gen_ai_prompt_events(span, attributes, event_timestamp)) + all_events.extend(self._extract_gen_ai_completion_events(span, attributes, event_timestamp)) + all_events.extend(self._extract_traceloop_events(span, attributes, event_timestamp)) + all_events.extend(self._extract_openlit_span_event_attributes(span, attributes, event_timestamp)) + all_events.extend(self._extract_openinference_attributes(span, attributes, event_timestamp)) + + for event in all_events: + self._event_logger.emit(event) + _logger.debug(f"Emitted Gen AI Event: {event.name}") + + def _filter_attributes(self, attributes: Dict[str, Any]) -> Dict[str, Any]: + """ + Create a new attributes dictionary with LLO attributes removed. + + This method creates a new dictionary containing only non-LLO attributes, + preserving the original values while filtering out sensitive LLO content. + This helps maintain privacy and reduces the size of spans. + + Args: + attributes: Original dictionary of span or event attributes + + Returns: + Dict[str, Any]: New dictionary with LLO attributes removed + """ + filtered_attributes = {} + + for key, value in attributes.items(): + if not self._is_llo_attribute(key): + filtered_attributes[key] = value + + return filtered_attributes + + def _is_llo_attribute(self, key: str) -> bool: + """ + Determine if an attribute key contains LLO content based on pattern matching. + + Checks attribute keys against two types of patterns: + 1. Exact match patterns (complete string equality): + - Traceloop: "traceloop.entity.input", "traceloop.entity.output" + - OpenLit: "gen_ai.prompt", "gen_ai.completion", "gen_ai.content.revised_prompt" + - OpenInference: "input.value", "output.value" + + 2. Regex match patterns (regular expression matching): + - Standard Gen AI: "gen_ai.prompt.{n}.content", "gen_ai.completion.{n}.content" + - OpenInference: "llm.input_messages.{n}.message.content", + "llm.output_messages.{n}.message.content" + + Args: + key: The attribute key to check + + Returns: + bool: True if the key matches any LLO pattern, False otherwise + """ + # Check exact matches first (faster) + if key in self._exact_match_patterns: + return True + + # Then check regex patterns + return any(pattern.match(key) for pattern in self._regex_patterns) + + def _extract_gen_ai_prompt_events( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from structured prompt attributes. + + Processes attributes matching the pattern `gen_ai.prompt.{n}.content` and their + associated `gen_ai.prompt.{n}.role` attributes to create appropriate events. + + Event types are determined by the role: + 1. `system` → `gen_ai.system.message` Event + 2. `user` → `gen_ai.user.message` Event + 3. `assistant` → `gen_ai.assistant.message` Event + 4. `function` → `gen_ai.{gen_ai.system}.message` custom Event + 5. `unknown` → `gen_ai.{gen_ai.system}.message` custom Event + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span.start_time + + Returns: + List[Event]: Events created from prompt attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + # Use helper method to get appropriate timestamp (prompts are inputs) + prompt_timestamp = self._get_timestamp(span, event_timestamp, is_input=True) + + for key, value in attributes.items(): + match = self._prompt_content_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"gen_ai.prompt.{index}.role" + role = attributes.get(role_key, "unknown") + + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key} + body = {"content": value, "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, + span_ctx=span_ctx, + timestamp=prompt_timestamp, + attributes=event_attributes, + body=body, + ) + + events.append(event) + + return events + + def _extract_gen_ai_completion_events( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from structured completion attributes. + + Processes attributes matching the pattern `gen_ai.completion.{n}.content` and their + associated `gen_ai.completion.{n}.role` attributes to create appropriate events. + + Event types are determined by the role: + 1. `assistant` → `gen_ai.assistant.message` Event (most common) + 2. Other roles → `gen_ai.{gen_ai.system}.message` custom Event + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span.end_time + + Returns: + List[Event]: Events created from completion attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + # Use helper method to get appropriate timestamp (completions are outputs) + completion_timestamp = self._get_timestamp(span, event_timestamp, is_input=False) + + for key, value in attributes.items(): + match = self._completion_content_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"gen_ai.completion.{index}.role" + role = attributes.get(role_key, "unknown") + + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key} + body = {"content": value, "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, + span_ctx=span_ctx, + timestamp=completion_timestamp, + attributes=event_attributes, + body=body, + ) + + events.append(event) + + return events + + def _extract_traceloop_events( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from Traceloop attributes. + + Processes Traceloop-specific attributes: + - `traceloop.entity.input`: Input data (uses span.start_time) + - `traceloop.entity.output`: Output data (uses span.end_time) + - `traceloop.entity.name`: Used as the gen_ai.system value + + Creates generic `gen_ai.{entity_name}.message` events for both input and output. + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + List[Event]: Events created from Traceloop attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("traceloop.entity.name", "unknown") + + # Use helper methods to get appropriate timestamps + input_timestamp = self._get_timestamp(span, event_timestamp, is_input=True) + output_timestamp = self._get_timestamp(span, event_timestamp, is_input=False) + + traceloop_attrs = [ + (TRACELOOP_ENTITY_INPUT, input_timestamp, ROLE_USER), # Treat input as user role + (TRACELOOP_ENTITY_OUTPUT, output_timestamp, ROLE_ASSISTANT), # Treat output as assistant role + ] + + for attr_key, timestamp, role in traceloop_attrs: + if attr_key in attributes: + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": attr_key} + body = {"content": attributes[attr_key], "role": role} + + # Custom event name for Traceloop (always use system-specific format) + event_name = f"gen_ai.{gen_ai_system}.message" + + event = self._get_gen_ai_event( + name=event_name, + span_ctx=span_ctx, + timestamp=timestamp, + attributes=event_attributes, + body=body, + ) + events.append(event) + + return events + + def _extract_openlit_span_event_attributes( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from OpenLit direct attributes. + + OpenLit uses direct key-value pairs for LLO attributes: + - `gen_ai.prompt`: Direct prompt text (treated as user message) + - `gen_ai.completion`: Direct completion text (treated as assistant message) + - `gen_ai.content.revised_prompt`: Revised prompt text (treated as system message) + + The event timestamps are set based on attribute type: + - Prompt and revised prompt: span.start_time + - Completion: span.end_time + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + List[Event]: Events created from OpenLit attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("gen_ai.system", "unknown") + + # Use helper methods to get appropriate timestamps + prompt_timestamp = self._get_timestamp(span, event_timestamp, is_input=True) + completion_timestamp = self._get_timestamp(span, event_timestamp, is_input=False) + + openlit_event_attrs = [ + (OPENLIT_PROMPT, prompt_timestamp, ROLE_USER), # Assume user role for direct prompts + (OPENLIT_COMPLETION, completion_timestamp, ROLE_ASSISTANT), # Assume assistant role for completions + (OPENLIT_REVISED_PROMPT, prompt_timestamp, ROLE_SYSTEM), # Assume system role for revised prompts + ] + + for attr_key, timestamp, role in openlit_event_attrs: + if attr_key in attributes: + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": attr_key} + body = {"content": attributes[attr_key], "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, + span_ctx=span_ctx, + timestamp=timestamp, + attributes=event_attributes, + body=body, + ) + + events.append(event) + + return events + + def _extract_openinference_attributes( + self, span: ReadableSpan, attributes: Dict[str, Any], event_timestamp: Optional[int] = None + ) -> List[Event]: + """ + Extract Gen AI Events from OpenInference attributes. + + OpenInference uses two patterns for LLO attributes: + 1. Direct values: + - `input.value`: Direct input prompt (treated as user message) + - `output.value`: Direct output response (treated as assistant message) + + 2. Structured messages: + - `llm.input_messages.{n}.message.content`: Individual input messages + - `llm.input_messages.{n}.message.role`: Role for input message + - `llm.output_messages.{n}.message.content`: Individual output messages + - `llm.output_messages.{n}.message.role`: Role for output message + + The LLM model name is extracted from the `llm.model_name` attribute + instead of `gen_ai.system` which other frameworks use. + + Event timestamps are set based on message type: + - Input messages: span.start_time + - Output messages: span.end_time + + Args: + span: The source ReadableSpan containing the attributes + attributes: Dictionary of attributes to process + event_timestamp: Optional timestamp to override span timestamps + + Returns: + List[Event]: Events created from OpenInference attributes + """ + events = [] + span_ctx = span.context + gen_ai_system = span.attributes.get("llm.model_name", "unknown") + + # Use helper methods to get appropriate timestamps + input_timestamp = self._get_timestamp(span, event_timestamp, is_input=True) + output_timestamp = self._get_timestamp(span, event_timestamp, is_input=False) + + # Process direct value attributes + openinference_direct_attrs = [ + (OPENINFERENCE_INPUT_VALUE, input_timestamp, ROLE_USER), + (OPENINFERENCE_OUTPUT_VALUE, output_timestamp, ROLE_ASSISTANT), + ] + + for attr_key, timestamp, role in openinference_direct_attrs: + if attr_key in attributes: + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": attr_key} + body = {"content": attributes[attr_key], "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, span_ctx=span_ctx, timestamp=timestamp, attributes=event_attributes, body=body + ) + + events.append(event) + + # Process input messages + for key, value in attributes.items(): + match = self._openinference_input_msg_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"llm.input_messages.{index}.message.role" + role = attributes.get(role_key, ROLE_USER) # Default to user if role not specified + + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key} + body = {"content": value, "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, span_ctx=span_ctx, timestamp=input_timestamp, attributes=event_attributes, body=body + ) + + events.append(event) + + # Process output messages + for key, value in attributes.items(): + match = self._openinference_output_msg_pattern.match(key) + if not match: + continue + + index = match.group(1) + role_key = f"llm.output_messages.{index}.message.role" + role = attributes.get(role_key, ROLE_ASSISTANT) # Default to assistant if role not specified + + event_attributes = {"gen_ai.system": gen_ai_system, "original_attribute": key} + body = {"content": value, "role": role} + + # Use helper method to determine event name based on role + event_name = self._get_event_name_for_role(role, gen_ai_system) + + event = self._get_gen_ai_event( + name=event_name, span_ctx=span_ctx, timestamp=output_timestamp, attributes=event_attributes, body=body + ) + + events.append(event) + + return events + + def _get_event_name_for_role(self, role: str, gen_ai_system: str) -> str: + """ + Map a message role to the appropriate event name. + + Args: + role: The role of the message (system, user, assistant, etc.) + gen_ai_system: The gen_ai system identifier + + Returns: + str: The appropriate event name for the given role + """ + if role == ROLE_SYSTEM: + return GEN_AI_SYSTEM_MESSAGE + elif role == ROLE_USER: + return GEN_AI_USER_MESSAGE + elif role == ROLE_ASSISTANT: + return GEN_AI_ASSISTANT_MESSAGE + else: + return f"gen_ai.{gen_ai_system}.message" + + def _get_timestamp(self, span: ReadableSpan, event_timestamp: Optional[int], is_input: bool) -> int: + """ + Determine the appropriate timestamp to use for an event. + + Args: + span: The source span + event_timestamp: Optional override timestamp + is_input: Whether this is an input (True) or output (False) message + + Returns: + int: The timestamp to use for the event + """ + if event_timestamp is not None: + return event_timestamp + + return span.start_time if is_input else span.end_time + + def _get_gen_ai_event(self, name, span_ctx, timestamp, attributes, body): + """ + Create and return a Gen AI Event with the specified parameters. + + This helper method constructs a fully configured OpenTelemetry Event object + that includes all necessary fields for proper event propagation and context. + + Args: + name: Event type name (e.g., gen_ai.system.message, gen_ai.user.message) + span_ctx: Span context to extract trace/span IDs from + timestamp: Timestamp for the event (nanoseconds) + attributes: Additional attributes to include with the event + body: Event body containing content and role information + + Returns: + Event: A fully configured OpenTelemetry Gen AI Event object with + proper trace context propagation + """ + return Event( + name=name, + timestamp=timestamp, + attributes=attributes, + body=body, + trace_id=span_ctx.trace_id, + span_id=span_ctx.span_id, + trace_flags=span_ctx.trace_flags, + ) diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py index a25e55330..70dfe36c4 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_bedrock_patches.py @@ -192,7 +192,7 @@ def extract_attributes(self, attributes: _AttributeMapT): if request_param_value: attributes[attribute_key] = request_param_value - def on_success(self, span: Span, result: _BotoResultT): + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): if self._operation_class is None: return @@ -220,6 +220,10 @@ def extract_attributes(self, attributes: _AttributeMapT): knowledge_base_id = self._call_context.params.get(_KNOWLEDGE_BASE_ID) if knowledge_base_id: attributes[AWS_BEDROCK_KNOWLEDGE_BASE_ID] = knowledge_base_id + + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): + # Currently no attributes to extract from the result + pass class _BedrockExtension(_AwsSdkExtension): @@ -229,7 +233,7 @@ class _BedrockExtension(_AwsSdkExtension): """ # pylint: disable=no-self-use - def on_success(self, span: Span, result: _BotoResultT): + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): # _GUARDRAIL_ID can only be retrieved from the response, not from the request guardrail_id = result.get(_GUARDRAIL_ID) if guardrail_id: @@ -333,7 +337,7 @@ def _set_if_not_none(attributes, key, value): attributes[key] = value # pylint: disable=too-many-branches - def on_success(self, span: Span, result: Dict[str, Any]): + def on_success(self, span: Span, result: Dict[str, Any], instrumentor_context=None): model_id = self._call_context.params.get(_MODEL_ID) if not model_id: diff --git a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py index 0f4a77d1e..d45404a0c 100644 --- a/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py +++ b/aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py @@ -75,8 +75,8 @@ def patch_extract_attributes(self, attributes: _AttributeMapT): old_on_success = _LambdaExtension.on_success - def patch_on_success(self, span: Span, result: _BotoResultT): - old_on_success(self, span, result) + def patch_on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): + old_on_success(self, span, result, instrumentor_context) lambda_configuration = result.get("Configuration", {}) function_arn = lambda_configuration.get("FunctionArn") if function_arn: @@ -180,8 +180,8 @@ def patch_extract_attributes(self, attributes: _AttributeMapT): old_on_success = _SqsExtension.on_success - def patch_on_success(self, span: Span, result: _BotoResultT): - old_on_success(self, span, result) + def patch_on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): + old_on_success(self, span, result, instrumentor_context) queue_url = result.get("QueueUrl") if queue_url: span.set_attribute(AWS_SQS_QUEUE_URL, queue_url) @@ -243,7 +243,7 @@ def extract_attributes(self, attributes: _AttributeMapT): attributes[AWS_SECRETSMANAGER_SECRET_ARN] = secret_id # pylint: disable=no-self-use - def on_success(self, span: Span, result: _BotoResultT): + def on_success(self, span: Span, result: _BotoResultT, instrumentor_context=None): secret_arn = result.get("ARN") if secret_arn: span.set_attribute(AWS_SECRETSMANAGER_SECRET_ARN, secret_arn) diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py index 87e6c4810..7b62cfb49 100644 --- a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_instrumentation_patch.py @@ -147,7 +147,7 @@ def _test_unpatched_botocore_instrumentation(self): ) # BedrockRuntime - self.assertFalse("bedrock-runtime" in _KNOWN_EXTENSIONS, "Upstream has added a bedrock-runtime extension") + self.assertTrue("bedrock-runtime" in _KNOWN_EXTENSIONS, "Upstream has added a bedrock-runtime extension") # SecretsManager self.assertFalse("secretsmanager" in _KNOWN_EXTENSIONS, "Upstream has added a SecretsManager extension") @@ -678,6 +678,7 @@ def _do_on_success( ) -> Dict[str, str]: span_mock: Span = MagicMock() mock_call_context = MagicMock() + mock_instrumentor_context = MagicMock() span_attributes: Dict[str, str] = {} def set_side_effect(set_key, set_value): @@ -692,6 +693,6 @@ def set_side_effect(set_key, set_value): mock_call_context.params = params extension = _KNOWN_EXTENSIONS[service_name]()(mock_call_context) - extension.on_success(span_mock, result) + extension.on_success(span_mock, result, mock_instrumentor_context) return span_attributes diff --git a/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py new file mode 100644 index 000000000..bb4ac51be --- /dev/null +++ b/aws-opentelemetry-distro/tests/amazon/opentelemetry/distro/test_llo_handler.py @@ -0,0 +1,651 @@ +from unittest import TestCase +from unittest.mock import MagicMock, patch, call + +from amazon.opentelemetry.distro.llo_handler import LLOHandler +from opentelemetry._events import Event +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk.trace import ReadableSpan, SpanContext +from opentelemetry.trace import SpanKind, TraceFlags, TraceState + + +class TestLLOHandler(TestCase): + def setUp(self): + self.logger_provider_mock = MagicMock(spec=LoggerProvider) + self.event_logger_mock = MagicMock() + self.event_logger_provider_mock = MagicMock() + self.event_logger_provider_mock.get_event_logger.return_value = self.event_logger_mock + + with patch( + "amazon.opentelemetry.distro.llo_handler.EventLoggerProvider", return_value=self.event_logger_provider_mock + ): + self.llo_handler = LLOHandler(self.logger_provider_mock) + + def _create_mock_span(self, attributes=None, kind=SpanKind.INTERNAL): + """ + Helper method to create a mock span with given attributes + """ + if attributes is None: + attributes = {} + + span_context = SpanContext( + trace_id=0x123456789ABCDEF0123456789ABCDEF0, + span_id=0x123456789ABCDEF0, + is_remote=False, + trace_flags=TraceFlags.SAMPLED, + trace_state=TraceState.get_default(), + ) + + mock_span = MagicMock(spec=ReadableSpan) + mock_span.context = span_context + mock_span.attributes = attributes + mock_span.kind = kind + mock_span.start_time = 1234567890 + + return mock_span + + def test_init(self): + """ + Test initialization of LLOHandler + """ + self.assertEqual(self.llo_handler._logger_provider, self.logger_provider_mock) + self.assertEqual(self.llo_handler._event_logger_provider, self.event_logger_provider_mock) + self.event_logger_provider_mock.get_event_logger.assert_called_once_with("gen_ai.events") + + def test_is_llo_attribute_match(self): + """ + Test _is_llo_attribute method with matching patterns + """ + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt.0.content")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt.123.content")) + + def test_is_llo_attribute_no_match(self): + """ + Test _is_llo_attribute method with non-matching patterns + """ + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.prompt.content")) + self.assertFalse(self.llo_handler._is_llo_attribute("gen_ai.prompt.abc.content")) + self.assertFalse(self.llo_handler._is_llo_attribute("some.other.attribute")) + + def test_is_llo_attribute_traceloop_match(self): + """ + Test _is_llo_attribute method with Traceloop patterns + """ + # Test exact matches for Traceloop attributes + self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.input")) + self.assertTrue(self.llo_handler._is_llo_attribute("traceloop.entity.output")) + + def test_is_llo_attribute_openlit_match(self): + """ + Test _is_llo_attribute method with OpenLit patterns + """ + # Test exact matches for direct OpenLit attributes + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.prompt")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.completion")) + self.assertTrue(self.llo_handler._is_llo_attribute("gen_ai.content.revised_prompt")) + + def test_is_llo_attribute_openinference_match(self): + """ + Test _is_llo_attribute method with OpenInference patterns + """ + # Test exact matches + self.assertTrue(self.llo_handler._is_llo_attribute("input.value")) + self.assertTrue(self.llo_handler._is_llo_attribute("output.value")) + + # Test regex matches + self.assertTrue(self.llo_handler._is_llo_attribute("llm.input_messages.0.message.content")) + self.assertTrue(self.llo_handler._is_llo_attribute("llm.output_messages.123.message.content")) + + def test_filter_attributes(self): + """ + Test _filter_attributes method + """ + attributes = { + "gen_ai.prompt.0.content": "test content", + "gen_ai.prompt.0.role": "user", + "normal.attribute": "value", + "another.normal.attribute": 123, + } + + filtered = self.llo_handler._filter_attributes(attributes) + + self.assertNotIn("gen_ai.prompt.0.content", filtered) + self.assertIn("gen_ai.prompt.0.role", filtered) + self.assertIn("normal.attribute", filtered) + self.assertIn("another.normal.attribute", filtered) + + def test_extract_gen_ai_prompt_events_system_role(self): + """ + Test _extract_gen_ai_prompt_events with system role + """ + attributes = { + "gen_ai.prompt.0.content": "system instruction", + "gen_ai.prompt.0.role": "system", + "gen_ai.system": "openai", + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.system.message") + self.assertEqual(event.body["content"], "system instruction") + self.assertEqual(event.body["role"], "system") + self.assertEqual(event.attributes["gen_ai.system"], "openai") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt.0.content") + + def test_extract_gen_ai_prompt_events_user_role(self): + """ + Test _extract_gen_ai_prompt_events with user role + """ + attributes = { + "gen_ai.prompt.0.content": "user question", + "gen_ai.prompt.0.role": "user", + "gen_ai.system": "anthropic", + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.user.message") + self.assertEqual(event.body["content"], "user question") + self.assertEqual(event.body["role"], "user") + self.assertEqual(event.attributes["gen_ai.system"], "anthropic") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt.0.content") + + def test_extract_gen_ai_prompt_events_assistant_role(self): + """ + Test _extract_gen_ai_prompt_events with assistant role + """ + attributes = { + "gen_ai.prompt.1.content": "assistant response", + "gen_ai.prompt.1.role": "assistant", + "gen_ai.system": "anthropic", + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.assistant.message") + self.assertEqual(event.body["content"], "assistant response") + self.assertEqual(event.body["role"], "assistant") + self.assertEqual(event.attributes["gen_ai.system"], "anthropic") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt.1.content") + + def test_extract_gen_ai_prompt_events_function_role(self): + """ + Test _extract_gen_ai_prompt_events with function role + """ + attributes = { + "gen_ai.prompt.2.content": "function data", + "gen_ai.prompt.2.role": "function", + "gen_ai.system": "openai", + } + + span = self._create_mock_span(attributes) + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.openai.message") + self.assertEqual(event.body["content"], "function data") + self.assertEqual(event.body["role"], "function") + self.assertEqual(event.attributes["gen_ai.system"], "openai") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt.2.content") + + def test_extract_gen_ai_prompt_events_unknown_role(self): + """ + Test _extract_gen_ai_prompt_events with unknown role + """ + attributes = { + "gen_ai.prompt.3.content": "unknown type content", + "gen_ai.prompt.3.role": "unknown", + "gen_ai.system": "bedrock", + } + + span = self._create_mock_span(attributes) + events = self.llo_handler._extract_gen_ai_prompt_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.bedrock.message") + self.assertEqual(event.body["content"], "unknown type content") + self.assertEqual(event.body["role"], "unknown") + self.assertEqual(event.attributes["gen_ai.system"], "bedrock") + + def test_extract_gen_ai_completion_events_assistant_role(self): + """ + Test _extract_gen_ai_completion_events with assistant role + """ + attributes = { + "gen_ai.completion.0.content": "assistant completion", + "gen_ai.completion.0.role": "assistant", + "gen_ai.system": "openai", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 # end time for completion events + + events = self.llo_handler._extract_gen_ai_completion_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.assistant.message") + self.assertEqual(event.body["content"], "assistant completion") + self.assertEqual(event.body["role"], "assistant") + self.assertEqual(event.attributes["gen_ai.system"], "openai") + self.assertEqual(event.timestamp, 1234567899) + + def test_extract_gen_ai_completion_events_other_role(self): + """ + Test _extract_gen_ai_completion_events with non-assistant role + """ + attributes = { + "gen_ai.completion.1.content": "other completion", + "gen_ai.completion.1.role": "other", + "gen_ai.system": "anthropic", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_gen_ai_completion_events(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.anthropic.message") + self.assertEqual(event.body["content"], "other completion") + self.assertEqual(event.attributes["gen_ai.system"], "anthropic") + + def test_extract_traceloop_events(self): + """ + Test _extract_traceloop_events + """ + attributes = { + "traceloop.entity.input": "input data", + "traceloop.entity.output": "output data", + "traceloop.entity.name": "my_entity", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_traceloop_events(span, attributes) + + self.assertEqual(len(events), 2) + + input_event = events[0] + self.assertEqual(input_event.name, "gen_ai.my_entity.message") + self.assertEqual(input_event.body["content"], "input data") + self.assertEqual(input_event.attributes["gen_ai.system"], "my_entity") + self.assertEqual(input_event.attributes["original_attribute"], "traceloop.entity.input") + self.assertEqual(input_event.timestamp, 1234567890) # start_time + + output_event = events[1] + self.assertEqual(output_event.name, "gen_ai.my_entity.message") + self.assertEqual(output_event.body["content"], "output data") + self.assertEqual(output_event.attributes["gen_ai.system"], "my_entity") + self.assertEqual(output_event.attributes["original_attribute"], "traceloop.entity.output") + self.assertEqual(output_event.timestamp, 1234567899) # end_time + + def test_extract_openlit_direct_prompt(self): + """ + Test _extract_openlit_span_event_attributes with direct prompt attribute + """ + attributes = { + "gen_ai.prompt": "user direct prompt", + "gen_ai.system": "openlit" + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.user.message") + self.assertEqual(event.body["content"], "user direct prompt") + self.assertEqual(event.body["role"], "user") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.prompt") + self.assertEqual(event.timestamp, 1234567890) # start_time + + def test_extract_openlit_direct_completion(self): + """ + Test _extract_openlit_span_event_attributes with direct completion attribute + """ + attributes = { + "gen_ai.completion": "assistant direct completion", + "gen_ai.system": "openlit" + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.assistant.message") + self.assertEqual(event.body["content"], "assistant direct completion") + self.assertEqual(event.body["role"], "assistant") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.completion") + self.assertEqual(event.timestamp, 1234567899) # end_time + + def test_extract_openlit_all_attributes(self): + """ + Test _extract_openlit_span_event_attributes with all OpenLit attributes + """ + attributes = { + "gen_ai.prompt": "user prompt", + "gen_ai.completion": "assistant response", + "gen_ai.content.revised_prompt": "revised prompt", + "gen_ai.system": "langchain" + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 3) + + # Check that all events have the correct system + for event in events: + self.assertEqual(event.attributes["gen_ai.system"], "langchain") + + # Check we have the expected event types + event_types = {event.name for event in events} + self.assertIn("gen_ai.user.message", event_types) + self.assertIn("gen_ai.assistant.message", event_types) + self.assertIn("gen_ai.system.message", event_types) + + # Check original attributes + original_attrs = {event.attributes["original_attribute"] for event in events} + self.assertIn("gen_ai.prompt", original_attrs) + self.assertIn("gen_ai.completion", original_attrs) + self.assertIn("gen_ai.content.revised_prompt", original_attrs) + + def test_extract_openlit_revised_prompt(self): + """ + Test _extract_openlit_span_event_attributes with revised prompt attribute + """ + attributes = { + "gen_ai.content.revised_prompt": "revised system prompt", + "gen_ai.system": "openlit" + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_openlit_span_event_attributes(span, attributes) + + self.assertEqual(len(events), 1) + event = events[0] + self.assertEqual(event.name, "gen_ai.system.message") + self.assertEqual(event.body["content"], "revised system prompt") + self.assertEqual(event.body["role"], "system") + self.assertEqual(event.attributes["gen_ai.system"], "openlit") + self.assertEqual(event.attributes["original_attribute"], "gen_ai.content.revised_prompt") + self.assertEqual(event.timestamp, 1234567890) # start_time + + def test_extract_openinference_direct_attributes(self): + """ + Test _extract_openinference_attributes with direct input/output values + """ + attributes = { + "input.value": "user prompt", + "output.value": "assistant response", + "llm.model_name": "gpt-4", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openinference_attributes(span, attributes) + + self.assertEqual(len(events), 2) + + input_event = events[0] + self.assertEqual(input_event.name, "gen_ai.user.message") + self.assertEqual(input_event.body["content"], "user prompt") + self.assertEqual(input_event.body["role"], "user") + self.assertEqual(input_event.attributes["gen_ai.system"], "gpt-4") + self.assertEqual(input_event.attributes["original_attribute"], "input.value") + self.assertEqual(input_event.timestamp, 1234567890) # start_time + + output_event = events[1] + self.assertEqual(output_event.name, "gen_ai.assistant.message") + self.assertEqual(output_event.body["content"], "assistant response") + self.assertEqual(output_event.body["role"], "assistant") + self.assertEqual(output_event.attributes["gen_ai.system"], "gpt-4") + self.assertEqual(output_event.attributes["original_attribute"], "output.value") + self.assertEqual(output_event.timestamp, 1234567899) # end_time + + def test_extract_openinference_structured_input_messages(self): + """ + Test _extract_openinference_attributes with structured input messages + """ + attributes = { + "llm.input_messages.0.message.content": "system prompt", + "llm.input_messages.0.message.role": "system", + "llm.input_messages.1.message.content": "user message", + "llm.input_messages.1.message.role": "user", + "llm.model_name": "claude-3", + } + + span = self._create_mock_span(attributes) + + events = self.llo_handler._extract_openinference_attributes(span, attributes) + + self.assertEqual(len(events), 2) + + system_event = events[0] + self.assertEqual(system_event.name, "gen_ai.system.message") + self.assertEqual(system_event.body["content"], "system prompt") + self.assertEqual(system_event.body["role"], "system") + self.assertEqual(system_event.attributes["gen_ai.system"], "claude-3") + self.assertEqual(system_event.attributes["original_attribute"], "llm.input_messages.0.message.content") + + user_event = events[1] + self.assertEqual(user_event.name, "gen_ai.user.message") + self.assertEqual(user_event.body["content"], "user message") + self.assertEqual(user_event.body["role"], "user") + self.assertEqual(user_event.attributes["gen_ai.system"], "claude-3") + self.assertEqual(user_event.attributes["original_attribute"], "llm.input_messages.1.message.content") + + def test_extract_openinference_structured_output_messages(self): + """ + Test _extract_openinference_attributes with structured output messages + """ + attributes = { + "llm.output_messages.0.message.content": "assistant response", + "llm.output_messages.0.message.role": "assistant", + "llm.model_name": "llama-3", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openinference_attributes(span, attributes) + + self.assertEqual(len(events), 1) + + output_event = events[0] + self.assertEqual(output_event.name, "gen_ai.assistant.message") + self.assertEqual(output_event.body["content"], "assistant response") + self.assertEqual(output_event.body["role"], "assistant") + self.assertEqual(output_event.attributes["gen_ai.system"], "llama-3") + self.assertEqual(output_event.attributes["original_attribute"], "llm.output_messages.0.message.content") + self.assertEqual(output_event.timestamp, 1234567899) # end_time + + def test_extract_openinference_mixed_attributes(self): + """ + Test _extract_openinference_attributes with a mix of all attribute types + """ + attributes = { + "input.value": "direct input", + "output.value": "direct output", + "llm.input_messages.0.message.content": "message input", + "llm.input_messages.0.message.role": "user", + "llm.output_messages.0.message.content": "message output", + "llm.output_messages.0.message.role": "assistant", + "llm.model_name": "bedrock.claude-3", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + events = self.llo_handler._extract_openinference_attributes(span, attributes) + + self.assertEqual(len(events), 4) + + # Verify all events have the correct model name + for event in events: + self.assertEqual(event.attributes["gen_ai.system"], "bedrock.claude-3") + + # We don't need to check every detail since other tests do that, + # but we can verify we got all the expected event types + event_types = {event.name for event in events} + self.assertIn("gen_ai.user.message", event_types) + self.assertIn("gen_ai.assistant.message", event_types) + + # Verify original attributes were correctly captured + original_attrs = {event.attributes["original_attribute"] for event in events} + self.assertIn("input.value", original_attrs) + self.assertIn("output.value", original_attrs) + self.assertIn("llm.input_messages.0.message.content", original_attrs) + self.assertIn("llm.output_messages.0.message.content", original_attrs) + + def test_emit_llo_attributes(self): + """ + Test _emit_llo_attributes + """ + attributes = { + "gen_ai.prompt.0.content": "prompt content", + "gen_ai.prompt.0.role": "user", + "gen_ai.completion.0.content": "completion content", + "gen_ai.completion.0.role": "assistant", + "traceloop.entity.input": "traceloop input", + "traceloop.entity.name": "entity_name", + "gen_ai.system": "openai", + } + + span = self._create_mock_span(attributes) + span.end_time = 1234567899 + + with patch.object(self.llo_handler, "_extract_gen_ai_prompt_events") as mock_extract_prompt, patch.object( + self.llo_handler, "_extract_gen_ai_completion_events" + ) as mock_extract_completion, patch.object( + self.llo_handler, "_extract_traceloop_events" + ) as mock_extract_traceloop, patch.object( + self.llo_handler, "_extract_openlit_span_event_attributes" + ) as mock_extract_openlit, patch.object( + self.llo_handler, "_extract_openinference_attributes" + ) as mock_extract_openinference: + + # Create mocks with name attribute properly set + prompt_event = MagicMock(spec=Event) + prompt_event.name = "gen_ai.user.message" + + completion_event = MagicMock(spec=Event) + completion_event.name = "gen_ai.assistant.message" + + traceloop_event = MagicMock(spec=Event) + traceloop_event.name = "gen_ai.entity.message" + + openlit_event = MagicMock(spec=Event) + openlit_event.name = "gen_ai.langchain.message" + + openinference_event = MagicMock(spec=Event) + openinference_event.name = "gen_ai.anthropic.message" + + mock_extract_prompt.return_value = [prompt_event] + mock_extract_completion.return_value = [completion_event] + mock_extract_traceloop.return_value = [traceloop_event] + mock_extract_openlit.return_value = [openlit_event] + mock_extract_openinference.return_value = [openinference_event] + + self.llo_handler._emit_llo_attributes(span, attributes) + + mock_extract_prompt.assert_called_once_with(span, attributes, None) + mock_extract_completion.assert_called_once_with(span, attributes, None) + mock_extract_traceloop.assert_called_once_with(span, attributes, None) + mock_extract_openlit.assert_called_once_with(span, attributes, None) + mock_extract_openinference.assert_called_once_with(span, attributes, None) + + self.event_logger_mock.emit.assert_has_calls( + [ + call(prompt_event), + call(completion_event), + call(traceloop_event), + call(openlit_event), + call(openinference_event), + ] + ) + + def test_process_spans(self): + """ + Test process_spans + """ + attributes = {"gen_ai.prompt.0.content": "prompt content", "normal.attribute": "normal value"} + + span = self._create_mock_span(attributes) + + with patch.object(self.llo_handler, "_emit_llo_attributes") as mock_emit, patch.object( + self.llo_handler, "_filter_attributes" + ) as mock_filter: + + filtered_attributes = {"normal.attribute": "normal value"} + mock_filter.return_value = filtered_attributes + + result = self.llo_handler.process_spans([span]) + + mock_emit.assert_called_once_with(span, attributes) + mock_filter.assert_called_once_with(attributes) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], span) + # Access the _attributes property that was set by the process_spans method + self.assertEqual(result[0]._attributes, filtered_attributes) + + def test_process_spans_with_bounded_attributes(self): + """ + Test process_spans with BoundedAttributes + """ + from opentelemetry.attributes import BoundedAttributes + + bounded_attrs = BoundedAttributes( + maxlen=10, + attributes={"gen_ai.prompt.0.content": "prompt content", "normal.attribute": "normal value"}, + immutable=False, + max_value_len=1000, + ) + + span = self._create_mock_span(bounded_attrs) + + with patch.object(self.llo_handler, "_emit_llo_attributes") as mock_emit, patch.object( + self.llo_handler, "_filter_attributes" + ) as mock_filter: + + filtered_attributes = {"normal.attribute": "normal value"} + mock_filter.return_value = filtered_attributes + + result = self.llo_handler.process_spans([span]) + + mock_emit.assert_called_once_with(span, bounded_attrs) + mock_filter.assert_called_once_with(bounded_attrs) + + self.assertEqual(len(result), 1) + self.assertEqual(result[0], span) + # Check that we got a BoundedAttributes instance + self.assertIsInstance(result[0]._attributes, BoundedAttributes) + # Check the underlying dictionary content + self.assertEqual(dict(result[0]._attributes), filtered_attributes)