diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index dd5433756c..71cd9300e5 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -16,13 +16,23 @@ from aio_pika.abc import AbstractChannel, AbstractMessage from opentelemetry.instrumentation.utils import is_instrumentation_enabled +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_MESSAGE_CONVERSATION_ID, + MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, +) +from opentelemetry.semconv._incubating.attributes.net_attributes import ( + NET_PEER_NAME, + NET_PEER_PORT, +) from opentelemetry.semconv.trace import ( MessagingOperationValues, SpanAttributes, ) from opentelemetry.trace import Span, SpanKind, Tracer -_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: "rabbitmq"} +_DEFAULT_ATTRIBUTES = {MESSAGING_SYSTEM: "rabbitmq"} class SpanBuilder: @@ -44,6 +54,8 @@ def set_operation(self, operation: MessagingOperationValues): def set_destination(self, destination: str): self._destination = destination + # TODO: Update this implementation once the semantic conventions for messaging stabilize + # See: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/messaging.md self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination def set_channel(self, channel: AbstractChannel): @@ -61,19 +73,17 @@ def set_channel(self, channel: AbstractChannel): url = connection.url self._attributes.update( { - SpanAttributes.NET_PEER_NAME: url.host, - SpanAttributes.NET_PEER_PORT: url.port or 5672, + NET_PEER_NAME: url.host, + NET_PEER_PORT: url.port or 5672, } ) def set_message(self, message: AbstractMessage): properties = message.properties if properties.message_id: - self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = ( - properties.message_id - ) + self._attributes[MESSAGING_MESSAGE_ID] = properties.message_id if properties.correlation_id: - self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = ( + self._attributes[MESSAGING_MESSAGE_CONVERSATION_ID] = ( properties.correlation_id ) @@ -81,10 +91,10 @@ def build(self) -> Optional[Span]: if not is_instrumentation_enabled(): return None if self._operation: - self._attributes[SpanAttributes.MESSAGING_OPERATION] = ( - self._operation.value - ) + self._attributes[MESSAGING_OPERATION] = self._operation.value else: + # TODO: Update this implementation once the semantic conventions for messaging stabilize + # See: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/registry/attributes/messaging.md self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True span = self._tracer.start_span( self._generate_span_name(), diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py index 431780ae7b..9be41cc844 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -6,7 +6,7 @@ AIOPIKA_VERSION_INFO = tuple(int(v) for v in aiopika_version.split(".")) MESSAGE_ID = "meesage_id" CORRELATION_ID = "correlation_id" -MESSAGING_SYSTEM = "rabbitmq" +MESSAGING_SYSTEM_VALUE = "rabbitmq" EXCHANGE_NAME = "exchange_name" QUEUE_NAME = "queue_name" ROUTING_KEY = "routing_key" diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py index 39a7724bfa..ef0bcf6482 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py @@ -19,6 +19,16 @@ from opentelemetry.instrumentation.aio_pika.callback_decorator import ( CallbackDecorator, ) +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_MESSAGE_CONVERSATION_ID, + MESSAGING_MESSAGE_ID, + MESSAGING_OPERATION, + MESSAGING_SYSTEM, +) +from opentelemetry.semconv._incubating.attributes.net_attributes import ( + NET_PEER_NAME, + NET_PEER_PORT, +) from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import SpanKind, get_tracer @@ -30,7 +40,7 @@ EXCHANGE_NAME, MESSAGE, MESSAGE_ID, - MESSAGING_SYSTEM, + MESSAGING_SYSTEM_VALUE, QUEUE_NAME, SERVER_HOST, SERVER_PORT, @@ -40,13 +50,13 @@ @skipIf(AIOPIKA_VERSION_INFO >= (8, 0), "Only for aio_pika 7") class TestInstrumentedQueueAioRmq7(TestCase): EXPECTED_ATTRIBUTES = { - SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + MESSAGING_SYSTEM: MESSAGING_SYSTEM_VALUE, SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME, - SpanAttributes.NET_PEER_NAME: SERVER_HOST, - SpanAttributes.NET_PEER_PORT: SERVER_PORT, - SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, - SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, - SpanAttributes.MESSAGING_OPERATION: "receive", + NET_PEER_NAME: SERVER_HOST, + NET_PEER_PORT: SERVER_PORT, + MESSAGING_MESSAGE_ID: MESSAGE_ID, + MESSAGING_MESSAGE_CONVERSATION_ID: CORRELATION_ID, + MESSAGING_OPERATION: "receive", } def setUp(self): @@ -80,13 +90,13 @@ def test_decorate_callback(self): @skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8") class TestInstrumentedQueueAioRmq8(TestCase): EXPECTED_ATTRIBUTES = { - SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + MESSAGING_SYSTEM: MESSAGING_SYSTEM_VALUE, SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME, - SpanAttributes.NET_PEER_NAME: SERVER_HOST, - SpanAttributes.NET_PEER_PORT: SERVER_PORT, - SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, - SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, - SpanAttributes.MESSAGING_OPERATION: "receive", + NET_PEER_NAME: SERVER_HOST, + NET_PEER_PORT: SERVER_PORT, + MESSAGING_MESSAGE_ID: MESSAGE_ID, + MESSAGING_MESSAGE_CONVERSATION_ID: CORRELATION_ID, + MESSAGING_OPERATION: "receive", } def setUp(self): diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index 90a029531d..d4aa047ef4 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -21,6 +21,15 @@ from opentelemetry.instrumentation.aio_pika.publish_decorator import ( PublishDecorator, ) +from opentelemetry.semconv._incubating.attributes.messaging_attributes import ( + MESSAGING_MESSAGE_CONVERSATION_ID, + MESSAGING_MESSAGE_ID, + MESSAGING_SYSTEM, +) +from opentelemetry.semconv._incubating.attributes.net_attributes import ( + NET_PEER_NAME, + NET_PEER_PORT, +) from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import SpanKind, get_tracer @@ -34,7 +43,7 @@ EXCHANGE_NAME, MESSAGE, MESSAGE_ID, - MESSAGING_SYSTEM, + MESSAGING_SYSTEM_VALUE, ROUTING_KEY, SERVER_HOST, SERVER_PORT, @@ -44,12 +53,12 @@ @skipIf(AIOPIKA_VERSION_INFO >= (8, 0), "Only for aio_pika 7") class TestInstrumentedExchangeAioRmq7(TestCase): EXPECTED_ATTRIBUTES = { - SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + MESSAGING_SYSTEM: MESSAGING_SYSTEM_VALUE, SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", - SpanAttributes.NET_PEER_NAME: SERVER_HOST, - SpanAttributes.NET_PEER_PORT: SERVER_PORT, - SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, - SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + NET_PEER_NAME: SERVER_HOST, + NET_PEER_PORT: SERVER_PORT, + MESSAGING_MESSAGE_ID: MESSAGE_ID, + MESSAGING_MESSAGE_CONVERSATION_ID: CORRELATION_ID, SpanAttributes.MESSAGING_TEMP_DESTINATION: True, } @@ -123,12 +132,12 @@ def test_publish_works_with_not_recording_span_robust(self): @skipIf(AIOPIKA_VERSION_INFO <= (8, 0), "Only for aio_pika 8") class TestInstrumentedExchangeAioRmq8(TestCase): EXPECTED_ATTRIBUTES = { - SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + MESSAGING_SYSTEM: MESSAGING_SYSTEM_VALUE, SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", - SpanAttributes.NET_PEER_NAME: SERVER_HOST, - SpanAttributes.NET_PEER_PORT: SERVER_PORT, - SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, - SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + NET_PEER_NAME: SERVER_HOST, + NET_PEER_PORT: SERVER_PORT, + MESSAGING_MESSAGE_ID: MESSAGE_ID, + MESSAGING_MESSAGE_CONVERSATION_ID: CORRELATION_ID, SpanAttributes.MESSAGING_TEMP_DESTINATION: True, }