Skip to content

Commit 12fabd6

Browse files
authored
Add CloudWatch EMF exporter integration (#410)
*Description of changes:* Add CloudWatch EMF exporter integration to AWS OpenTelemetry configurator - Add CloudWatchEMFExporter integration to metrics initialization - Add create_emf_exporter() to instantiate CloudWatch EMF exporter with proper configuration - Update _customize_metric_exporters to support EMF exporter when enabled - Add comprehensive unit tests for new EMF functionality *Note:* With Agent Observability flag is on, ADOT will set `OTEL_METRICS_EXPORTER=awsemf` by default, but `awsemf` is not yet contributed to OTel upstream so it will be removed from the OTEL_METRICS_EXPORTER value list once EmfExporter is created By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent 04b72bb commit 12fabd6

File tree

2 files changed

+293
-12
lines changed

2 files changed

+293
-12
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 114 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
import os
55
import re
66
from logging import NOTSET, Logger, getLogger
7-
from typing import ClassVar, Dict, List, Type, Union
7+
from typing import ClassVar, Dict, List, NamedTuple, Optional, Type, Union
88

99
from importlib_metadata import version
1010
from typing_extensions import override
1111

1212
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE
1313
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
14-
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
14+
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled, is_installed
1515
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
1616
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1717
AttributePropagatingSpanProcessorBuilder,
@@ -98,6 +98,7 @@
9898

9999
AWS_OTLP_LOGS_GROUP_HEADER = "x-aws-log-group"
100100
AWS_OTLP_LOGS_STREAM_HEADER = "x-aws-log-stream"
101+
AWS_EMF_METRICS_NAMESPACE = "x-aws-metric-namespace"
101102

102103
# UDP package size is not larger than 64KB
103104
LAMBDA_SPAN_EXPORT_BATCH_SIZE = 10
@@ -113,6 +114,13 @@
113114
_logger: Logger = getLogger(__name__)
114115

115116

117+
class OtlpLogHeaderSetting(NamedTuple):
118+
log_group: Optional[str]
119+
log_stream: Optional[str]
120+
namespace: Optional[str]
121+
is_valid: bool
122+
123+
116124
class AwsOpenTelemetryConfigurator(_OTelSDKConfigurator):
117125
"""
118126
This AwsOpenTelemetryConfigurator extend _OTelSDKConfigurator configuration with the following change:
@@ -141,6 +149,11 @@ def _configure(self, **kwargs):
141149
# Long term, we wish to contribute this to upstream to improve initialization customizability and reduce dependency on
142150
# internal logic.
143151
def _initialize_components():
152+
# Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
153+
# from _import_exporters in OTel dependencies which would try to load exporters
154+
# We will contribute emf exporter to upstream for supporting OTel metrics in SDK
155+
is_emf_enabled = _check_emf_exporter_enabled()
156+
144157
trace_exporters, metric_exporters, log_exporters = _import_exporters(
145158
_get_exporter_names("traces"),
146159
_get_exporter_names("metrics"),
@@ -176,7 +189,8 @@ def _initialize_components():
176189
sampler=sampler,
177190
resource=resource,
178191
)
179-
_init_metrics(metric_exporters, resource)
192+
193+
_init_metrics(metric_exporters, resource, is_emf_enabled)
180194
logging_enabled = os.getenv(_OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED, "false")
181195
if logging_enabled.strip().lower() == "true":
182196
_init_logging(log_exporters, resource)
@@ -235,6 +249,7 @@ def _init_tracing(
235249
def _init_metrics(
236250
exporters_or_readers: Dict[str, Union[Type[MetricExporter], Type[MetricReader]]],
237251
resource: Resource = None,
252+
is_emf_enabled: bool = False,
238253
):
239254
metric_readers = []
240255
views = []
@@ -247,7 +262,7 @@ def _init_metrics(
247262
else:
248263
metric_readers.append(PeriodicExportingMetricReader(exporter_or_reader_class(**exporter_args)))
249264

250-
_customize_metric_exporters(metric_readers, views)
265+
_customize_metric_exporters(metric_readers, views, is_emf_enabled)
251266

252267
provider = MeterProvider(resource=resource, metric_readers=metric_readers, views=views)
253268
set_meter_provider(provider)
@@ -408,7 +423,7 @@ def _customize_logs_exporter(log_exporter: LogExporter, resource: Resource) -> L
408423
if _is_aws_otlp_endpoint(logs_endpoint, "logs"):
409424
_logger.info("Detected using AWS OTLP Logs Endpoint.")
410425

411-
if isinstance(log_exporter, OTLPLogExporter) and _validate_logs_headers():
426+
if isinstance(log_exporter, OTLPLogExporter) and _validate_and_fetch_logs_header().is_valid:
412427
# Setting default compression mode to Gzip as this is the behavior in upstream's
413428
# collector otlp http exporter:
414429
# https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlphttpexporter
@@ -466,7 +481,9 @@ def session_id_predicate(baggage_key: str) -> bool:
466481
return
467482

468483

469-
def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[View]) -> None:
484+
def _customize_metric_exporters(
485+
metric_readers: List[MetricReader], views: List[View], is_emf_enabled: bool = False
486+
) -> None:
470487
if _is_application_signals_runtime_enabled():
471488
_get_runtime_metric_views(views, 0 == len(metric_readers))
472489

@@ -478,6 +495,11 @@ def _customize_metric_exporters(metric_readers: List[MetricReader], views: List[
478495
)
479496
metric_readers.append(scope_based_periodic_exporting_metric_reader)
480497

498+
if is_emf_enabled:
499+
emf_exporter = create_emf_exporter()
500+
if emf_exporter:
501+
metric_readers.append(PeriodicExportingMetricReader(emf_exporter))
502+
481503

482504
def _get_runtime_metric_views(views: List[View], retain_runtime_only: bool) -> None:
483505
runtime_metrics_scope_name = SYSTEM_METRICS_INSTRUMENTATION_SCOPE_NAME
@@ -567,7 +589,7 @@ def _is_aws_otlp_endpoint(otlp_endpoint: str = None, service: str = "xray") -> b
567589
return bool(re.match(pattern, otlp_endpoint.lower()))
568590

569591

570-
def _validate_logs_headers() -> bool:
592+
def _validate_and_fetch_logs_header() -> OtlpLogHeaderSetting:
571593
"""Checks if x-aws-log-group and x-aws-log-stream are present in the headers in order to send logs to
572594
AWS OTLP Logs endpoint."""
573595

@@ -578,26 +600,36 @@ def _validate_logs_headers() -> bool:
578600
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
579601
"to include x-aws-log-group and x-aws-log-stream"
580602
)
581-
return False
603+
return OtlpLogHeaderSetting(None, None, None, False)
582604

605+
log_group = None
606+
log_stream = None
607+
namespace = None
583608
filtered_log_headers_count = 0
584609

585610
for pair in logs_headers.split(","):
586611
if "=" in pair:
587612
split = pair.split("=", 1)
588613
key = split[0]
589614
value = split[1]
590-
if key in (AWS_OTLP_LOGS_GROUP_HEADER, AWS_OTLP_LOGS_STREAM_HEADER) and value:
615+
if key == AWS_OTLP_LOGS_GROUP_HEADER and value:
616+
log_group = value
617+
filtered_log_headers_count += 1
618+
elif key == AWS_OTLP_LOGS_STREAM_HEADER and value:
619+
log_stream = value
591620
filtered_log_headers_count += 1
621+
elif key == AWS_EMF_METRICS_NAMESPACE and value:
622+
namespace = value
592623

593-
if filtered_log_headers_count != 2:
624+
is_valid = filtered_log_headers_count == 2 and log_group is not None and log_stream is not None
625+
626+
if not is_valid:
594627
_logger.warning(
595628
"Improper configuration: Please configure the environment variable OTEL_EXPORTER_OTLP_LOGS_HEADERS "
596629
"to have values for x-aws-log-group and x-aws-log-stream"
597630
)
598-
return False
599631

600-
return True
632+
return OtlpLogHeaderSetting(log_group, log_stream, namespace, is_valid)
601633

602634

603635
def _get_metric_export_interval():
@@ -668,3 +700,73 @@ def create_exporter(self):
668700
)
669701

670702
raise RuntimeError(f"Unsupported AWS Application Signals export protocol: {protocol} ")
703+
704+
705+
def _check_emf_exporter_enabled() -> bool:
706+
"""
707+
Checks if OTEL_METRICS_EXPORTER contains "awsemf", removes it if present,
708+
and updates the environment variable.
709+
710+
Remove 'awsemf' from OTEL_METRICS_EXPORTER if present to prevent validation errors
711+
from _import_exporters in OTel dependencies which would try to load exporters
712+
We will contribute emf exporter to upstream for supporting OTel metrics in SDK
713+
714+
Returns:
715+
bool: True if "awsemf" was found and removed, False otherwise.
716+
"""
717+
# Get the current exporter value
718+
exporter_value = os.environ.get("OTEL_METRICS_EXPORTER", "")
719+
720+
# Check if it's empty
721+
if not exporter_value:
722+
return False
723+
724+
# Split by comma and convert to list
725+
exporters = [exp.strip() for exp in exporter_value.split(",")]
726+
727+
# Check if awsemf is in the list
728+
if "awsemf" not in exporters:
729+
return False
730+
731+
# Remove awsemf from the list
732+
exporters.remove("awsemf")
733+
734+
# Join the remaining exporters and update the environment variable
735+
new_value = ",".join(exporters) if exporters else ""
736+
737+
# Set the new value (or unset if empty)
738+
if new_value:
739+
os.environ["OTEL_METRICS_EXPORTER"] = new_value
740+
elif "OTEL_METRICS_EXPORTER" in os.environ:
741+
del os.environ["OTEL_METRICS_EXPORTER"]
742+
743+
return True
744+
745+
746+
def create_emf_exporter():
747+
"""Create and configure the CloudWatch EMF exporter."""
748+
try:
749+
# Check if botocore is available before importing the EMF exporter
750+
if not is_installed("botocore"):
751+
_logger.warning("botocore is not installed. EMF exporter requires botocore")
752+
return None
753+
754+
# pylint: disable=import-outside-toplevel
755+
from amazon.opentelemetry.distro.exporter.aws.metrics.aws_cloudwatch_emf_exporter import (
756+
AwsCloudWatchEmfExporter,
757+
)
758+
759+
log_header_setting = _validate_and_fetch_logs_header()
760+
761+
if not log_header_setting.is_valid:
762+
return None
763+
764+
return AwsCloudWatchEmfExporter(
765+
namespace=log_header_setting.namespace,
766+
log_group_name=log_header_setting.log_group,
767+
log_stream_name=log_header_setting.log_stream,
768+
)
769+
# pylint: disable=broad-exception-caught
770+
except Exception as errors:
771+
_logger.error("Failed to create EMF exporter: %s", errors)
772+
return None

0 commit comments

Comments
 (0)