Skip to content

Commit 8584b18

Browse files
committed
Add CloudWatch EMF exporter integration to AWS OpenTelemetry configurator
1 parent aed584f commit 8584b18

File tree

2 files changed

+293
-12
lines changed

2 files changed

+293
-12
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 114 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
import os
55
import re
66
from logging import NOTSET, Logger, getLogger
7-
from typing import ClassVar, Dict, List, Type, Union
7+
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union
88

99
from importlib_metadata import version
1010
from typing_extensions import override
1111

1212
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE
1313
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
14-
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
14+
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed
1515
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
1616
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1717
AttributePropagatingSpanProcessorBuilder,
@@ -98,6 +98,7 @@
9898

9999
AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
100100
AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"
101+
AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace"
101102

102103
# UDP package size is not larger than 64KB
103104
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10
@@ -113,6 +114,13 @@
113114
_logger: Logger = getLogger(__name__)
114115

115116

117+
class OtlpLogHeaderSetting(NamedTuple):
118+
log_group: Optional[str]
119+
log_stream: Optional[str]
120+
namespace: Optional[str]
121+
is_valid: bool
122+
123+
116124
class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator):
117125
"""
118126
This AwsOpenTelemetryConfigurator extend _OTelSDKConfigurator configuration with the following change:
@@ -141,6 +149,11 @@ def _configure(self, **kwargs):
141149
# Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on
142150
# internal logic.
143151
def _initialize_components():
152+
# Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
153+
# from _import_exporters in OTel dependencies which would try to load exporters
154+
# We will contribute emf exporter to upstream for supporting OTel metrics in SDK
155+
is_emf_enabled = _check_emf_exporter_enabled()
156+
144157
trace_exporters, metric_exporters, log_exporters = _import_exporters(
145158
_get_exporter_names("traces"),
146159
_get_exporter_names("metrics"),
@@ -176,7 +189,8 @@ def _initialize_components():
176189
sampler=sampler,
177190
resource=resource,
178191
)
179-
_init_metrics(metric_exporters, resource)
192+
193+
_init_metrics(metric_exporters, resource, is_emf_enabled)
180194
logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false")
181195
if logging_enabled.strip().lower() == "true":
182196
_init_logging(log_exporters, resource)
@@ -235,6 +249,7 @@ def _init_tracing(
235249
def _init_metrics(
236250
exporters_or_readers: Dict[str, Union[Type[MetricExporter], Type[MetricReader]]],
237251
resource: Resource = None,
252+
is_emf_enabled: bool = False,
238253
):
239254
metric_readers = []
240255
views = []
@@ -247,7 +262,7 @@ def _init_metrics(
247262
else:
248263
metric_readers.append(PeriodicExportingMetricReader(exporter_or_reader_class(**exporter_args)))
249264

250-
_customize_metric_exporters(metric_readers, views)
265+
_customize_metric_exporters(metric_readers, views, is_emf_enabled)
251266

252267
provider = MeterProvider(resource=resource, metric_readers=metric_readers, views=views)
253268
set_meter_provider(provider)
@@ -397,7 +412,7 @@ def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> L
397412
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
398413
_logger.info("Detected using AWS OTLP Logs Endpoint.")
399414

400-
if isinstance(log_exporter, OTLPLogExporter) and _validate_logs_headers():
415+
if isinstance(log_exporter, OTLPLogExporter) and _validate_and_fetch_logs_header().is_valid:
401416
# Setting default compression mode to Gzip as this is the behavior in upstream's
402417
# collector otlp http exporter:
403418
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
@@ -450,7 +465,9 @@ def session_id_predicate(baggage_key: str) -> bool:
450465
return
451466

452467

453-
def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[View]) -> None:
468+
def _customize_metric_exporters(
469+
metric_readers: List[MetricReader], views: List[View], is_emf_enabled: bool = False
470+
) -> None:
454471
if _is_application_signals_runtime_enabled():
455472
_get_runtime_metric_views(views, 0 == len(metric_readers))
456473

@@ -462,6 +479,11 @@ def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[
462479
)
463480
metric_readers.append(scope_based_periodic_exporting_metric_reader)
464481

482+
if is_emf_enabled:
483+
emf_exporter = create_emf_exporter()
484+
if emf_exporter:
485+
metric_readers.append(PeriodicExportingMetricReader(emf_exporter))
486+
465487

466488
def _get_runtime_metric_views(views: List[View], retain_runtime_only: bool) -> None:
467489
runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME
@@ -551,7 +573,7 @@ def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> b
551573
return bool(re.match(pattern, otlp_endpoint.lower()))
552574

553575

554-
def _validate_logs_headers() -> bool:
576+
def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting:
555577
"""Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
556578
AWS OTLP Logs endpoint."""
557579

@@ -562,26 +584,36 @@ def _validate_logs_headers() -> bool:
562584
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
563585
"to include x-aws-log-group and x-aws-log-stream"
564586
)
565-
return False
587+
return OtlpLogHeaderSetting(None, None, None, False)
566588

589+
log_group = None
590+
log_stream = None
591+
namespace = None
567592
filtered_log_headers_count = 0
568593

569594
for pair in logs_headers.split(","):
570595
if "=" in pair:
571596
split = pair.split("=", 1)
572597
key = split[0]
573598
value = split[1]
574-
if key in (AWS_OTLP_LOGS_GROUP_HEADER, AWS_OTLP_LOGS_STREAM_HEADER) and value:
599+
if key == AWS_OTLP_LOGS_GROUP_HEADER and value:
600+
log_group = value
601+
filtered_log_headers_count += 1
602+
elif key == AWS_OTLP_LOGS_STREAM_HEADER and value:
603+
log_stream = value
575604
filtered_log_headers_count += 1
605+
elif key == AWS_EMF_METRICS_NAMESPACE and value:
606+
namespace = value
576607

577-
if filtered_log_headers_count != 2:
608+
is_valid = filtered_log_headers_count == 2
609+
610+
if not is_valid:
578611
_logger.warning(
579612
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
580613
"to have values for x-aws-log-group and x-aws-log-stream"
581614
)
582-
return False
583615

584-
return True
616+
return OtlpLogHeaderSetting(log_group, log_stream, namespace, is_valid)
585617

586618

587619
def _get_metric_export_interval():
@@ -652,3 +684,73 @@ def create_exporter(self):
652684
)
653685

654686
raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")
687+
688+
689+
def _check_emf_exporter_enabled() -> bool:
690+
"""
691+
Checks if OTEL_METRICS_EXPORTER contains "awsemf", removes it if present,
692+
and updates the environment variable.
693+
694+
Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
695+
from _import_exporters in OTel dependencies which would try to load exporters
696+
We will contribute emf exporter to upstream for supporting OTel metrics in SDK
697+
698+
Returns:
699+
bool: True if "awsemf" was found and removed, False otherwise.
700+
"""
701+
# Get the current exporter value
702+
exporter_value = os.environ.get("OTEL_METRICS_EXPORTER", "")
703+
704+
# Check if it's empty
705+
if not exporter_value:
706+
return False
707+
708+
# Split by comma and convert to list
709+
exporters = [exp.strip() for exp in exporter_value.split(",")]
710+
711+
# Check if awsemf is in the list
712+
if "awsemf" not in exporters:
713+
return False
714+
715+
# Remove awsemf from the list
716+
exporters.remove("awsemf")
717+
718+
# Join the remaining exporters and update the environment variable
719+
new_value = ",".join(exporters) if exporters else ""
720+
721+
# Set the new value (or unset if empty)
722+
if new_value:
723+
os.environ["OTEL_METRICS_EXPORTER"] = new_value
724+
elif "OTEL_METRICS_EXPORTER" in os.environ:
725+
del os.environ["OTEL_METRICS_EXPORTER"]
726+
727+
return True
728+
729+
730+
def create_emf_exporter():
731+
"""Create and configure the CloudWatch EMF exporter."""
732+
try:
733+
# Check if botocore is available before importing the EMF exporter
734+
if not is_installed("botocore"):
735+
_logger.warning("botocore is not installed. EMF exporter requires botocore")
736+
return None
737+
738+
# pylint: disable=import-outside-toplevel
739+
from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import (
740+
AwsCloudWatchEmfExporter,
741+
)
742+
743+
log_header_setting = _validate_and_fetch_logs_header()
744+
745+
if not log_header_setting.is_valid:
746+
return None
747+
748+
return AwsCloudWatchEmfExporter(
749+
namespace=log_header_setting.namespace,
750+
log_group_name=log_header_setting.log_group,
751+
log_stream_name=log_header_setting.log_stream,
752+
)
753+
# pylint: disable=broad-exception-caught
754+
except Exception as errors:
755+
_logger.error("Failed to create EMF exporter: %s", errors)
756+
return None

0 commit comments

Comments
 (0)