Skip to content

Commit

Permalink
SigV4 Authentication support for http/protobuf exporter (aws-observab…
Browse files Browse the repository at this point in the history
…ility#324)

*Issue #, if available:*
Adding SigV4 Authentication extension for Exporting traces to OTLP
CloudWatch endpoint without needing to explictily install the collector.

*Description of changes:*
Added a new class that extends upstream's OTLP http span exporter.
Overrides the _export method so that if the endpoint is CW, we add an
extra step of injecting SigV4 authentication to the headers.

By submitting this pull request, I confirm that you can use, modify,
copy, and redistribute this contribution, under the terms of your
choice.
  • Loading branch information
liustve authored Feb 13, 2025
2 parents 0c3ade0 + 6cf44bd commit e902760
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 20 deletions.
23 changes: 23 additions & 0 deletions aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0

import sys
from logging import Logger, getLogger

import pkg_resources

_logger: Logger = getLogger(__name__)


def is_installed(req: str) -> bool:
"""Is the given required package installed?"""

if req in sys.modules and sys.modules[req] is not None:
return True

try:
pkg_resources.get_distribution(req)
except Exception as exc: # pylint: disable=broad-except
_logger.debug("Skipping instrumentation patch: package %s, exception: %s", req, exc)
return False
return True
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
import os
import re
from logging import Logger, getLogger
from typing import ClassVar, Dict, List, Type, Union

Expand All @@ -19,6 +20,7 @@
AwsMetricAttributesSpanExporterBuilder,
)
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
from amazon.opentelemetry.distro.otlp_aws_span_exporter import OTLPAwsSpanExporter
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
Expand Down Expand Up @@ -81,6 +83,7 @@
OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED_CONFIG = "OTEL_AWS_PYTHON_DEFER_TO_WORKERS_ENABLED"
SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME = "opentelemetry.instrumentation.system_metrics"
OTEL_EXPORTER_OTLP_TRACES_ENDPOINT = "OTEL_EXPORTER_OTLP_TRACES_ENDPOINT"
XRAY_OTLP_ENDPOINT_PATTERN = r"https://xray\.([a-z0-9-]+)\.amazonaws\.com/v1/traces$"
# UDP package size is not larger than 64KB
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10

Expand Down Expand Up @@ -315,6 +318,11 @@ def _customize_exporter(span_exporter: SpanExporter, resource: Resource) -> Span
traces_endpoint = os.environ.get(AWS_XRAY_DAEMON_ADDRESS_CONFIG, "127.0.0.1:2000")
span_exporter = OTLPUdpSpanExporter(endpoint=traces_endpoint)

if isinstance(span_exporter, OTLPSpanExporter) and is_xray_otlp_endpoint(
os.environ.get(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT)
):
span_exporter = OTLPAwsSpanExporter(endpoint=os.getenv(OTEL_EXPORTER_OTLP_TRACES_ENDPOINT))

if not _is_application_signals_enabled():
return span_exporter

Expand All @@ -328,6 +336,10 @@ def _customize_span_processors(provider: TracerProvider, resource: Resource) ->
# Construct and set local and remote attributes span processor
provider.add_span_processor(AttributePropagatingSpanProcessorBuilder().build())

# Do not export Application-Signals metrics if it's XRay OTLP endpoint
if is_xray_otlp_endpoint():
return

# Export 100% spans and not export Application-Signals metrics if on Lambda.
if _is_lambda_environment():
_export_unsampled_span_for_lambda(provider, resource)
Expand Down Expand Up @@ -437,6 +449,15 @@ def _is_lambda_environment():
return AWS_LAMBDA_FUNCTION_NAME_CONFIG in os.environ


def is_xray_otlp_endpoint(otlp_endpoint: str = None) -> bool:
"""Is the given endpoint the XRay OTLP endpoint?"""

if not otlp_endpoint:
return False

return bool(re.match(XRAY_OTLP_ENDPOINT_PATTERN, otlp_endpoint.lower()))


def _get_metric_export_interval():
export_interval_millis = float(os.environ.get(METRIC_EXPORT_INTERVAL_CONFIG, DEFAULT_METRIC_EXPORT_INTERVAL))
_logger.debug("Span Metrics export interval: %s", export_interval_millis)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import logging
from typing import Dict, Optional

import requests

from amazon.opentelemetry.distro._utils import is_installed
from opentelemetry.exporter.otlp.proto.http import Compression
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter

AWS_SERVICE = "xray"
_logger = logging.getLogger(__name__)


class OTLPAwsSpanExporter(OTLPSpanExporter):
"""
This exporter extends the functionality of the OTLPSpanExporter to allow spans to be exported to the
XRay OTLP endpoint https://xray.[AWSRegion].amazonaws.com/v1/traces. Utilizes the botocore
library to sign and directly inject SigV4 Authentication to the exported request's headers.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
"""

def __init__(
self,
endpoint: Optional[str] = None,
certificate_file: Optional[str] = None,
client_key_file: Optional[str] = None,
client_certificate_file: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
timeout: Optional[int] = None,
compression: Optional[Compression] = None,
rsession: Optional[requests.Session] = None,
):

self._aws_region = None

# Requires botocore to be installed to sign the headers. However,
# some users might not need to use this exporter. In order not conflict
# with existing behavior, we check for botocore before initializing this exporter.

if endpoint and is_installed("botocore"):
# pylint: disable=import-outside-toplevel
from botocore import auth, awsrequest, session

self.boto_auth = auth
self.boto_aws_request = awsrequest
self.boto_session = session.Session()

# Assumes only valid endpoints passed are of XRay OTLP format.
# The only usecase for this class would be for ADOT Python Auto Instrumentation and that already validates
# the endpoint to be an XRay OTLP endpoint.
self._aws_region = endpoint.split(".")[1]

else:
_logger.error(
"botocore is required to export traces to %s. Please install it using `pip install botocore`",
endpoint,
)

super().__init__(
endpoint=endpoint,
certificate_file=certificate_file,
client_key_file=client_key_file,
client_certificate_file=client_certificate_file,
headers=headers,
timeout=timeout,
compression=compression,
session=rsession,
)

# Overrides upstream's private implementation of _export. All behaviors are
# the same except if the endpoint is an XRay OTLP endpoint, we will sign the request
# with SigV4 in headers before sending it to the endpoint. Otherwise, we will skip signing.
def _export(self, serialized_data: bytes):
request = self.boto_aws_request.AWSRequest(
method="POST",
url=self._endpoint,
data=serialized_data,
headers={"Content-Type": "application/x-protobuf"},
)

credentials = self.boto_session.get_credentials()

if credentials is not None:
signer = self.boto_auth.SigV4Auth(credentials, AWS_SERVICE, self._aws_region)

try:
signer.add_auth(request)
self._session.headers.update(dict(request.headers))

except Exception as signing_error: # pylint: disable=broad-except
_logger.error("Failed to sign request: %s", signing_error)

return super()._export(serialized_data)
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
# SPDX-License-Identifier: Apache-2.0
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
import os
import sys
from logging import Logger, getLogger

import pkg_resources

from amazon.opentelemetry.distro._utils import is_installed
from amazon.opentelemetry.distro.patches._resource_detector_patches import _apply_resource_detector_patches

# Env variable for determining whether we want to monkey patch gevent modules. Possible values are 'all', 'none', and
Expand All @@ -25,7 +23,7 @@ def apply_instrumentation_patches() -> None:
Where possible, automated testing should be run to catch upstream changes resulting in broken patches
"""
if _is_installed("gevent"):
if is_installed("gevent"):
try:
gevent_patch_module = os.environ.get(AWS_GEVENT_PATCH_MODULES, "all")

Expand Down Expand Up @@ -56,7 +54,7 @@ def apply_instrumentation_patches() -> None:
except Exception as exc: # pylint: disable=broad-except
_logger.info("Failed to monkey patch gevent, exception: %s", exc)

if _is_installed("botocore ~= 1.0"):
if is_installed("botocore ~= 1.0"):
# pylint: disable=import-outside-toplevel
# Delay import to only occur if patches is safe to apply (e.g. the instrumented library is installed).
from amazon.opentelemetry.distro.patches._botocore_patches import _apply_botocore_instrumentation_patches
Expand All @@ -66,15 +64,3 @@ def apply_instrumentation_patches() -> None:
# No need to check if library is installed as this patches opentelemetry.sdk,
# which must be installed for the distro to work at all.
_apply_resource_detector_patches()


def _is_installed(req: str) -> bool:
if req in sys.modules:
return True

try:
pkg_resources.get_distribution(req)
except Exception as exc: # pylint: disable=broad-except
_logger.debug("Skipping instrumentation patch: package %s, exception: %s", req, exc)
return False
return True
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
_LAMBDA_SOURCE_MAPPING_ID: str = "lambdaEventSourceMappingID"

# Patch names
GET_DISTRIBUTION_PATCH: str = (
"amazon.opentelemetry.distro.patches._instrumentation_patch.pkg_resources.get_distribution"
)
GET_DISTRIBUTION_PATCH: str = "amazon.opentelemetry.distro._utils.pkg_resources.get_distribution"


class TestInstrumentationPatch(TestCase):
Expand Down
Loading

0 comments on commit e902760

Please sign in to comment.