Skip to content

Commit f1c49fe

Browse files
authored
[WIP] Support for handling LLO in OTLP AWS Logs Exporter (#364)
Supporting ADOT auto instrumentation to handle LLO log events. *Description of changes:* 1. Adds `AwsBatchLogRecordProcessor` a backwards compatible custom logs BatchProcessor which has the following invariants: - The unserialized, uncompressed data size of exported batches will ALWAYS be <= 1 MB except for the case below: - If the data size of an `exported batch is ever > 1 MB` then the batch size is always length 1 2. `OTLPAwsLogExporter`: Adds a new behavior for Retry delay based on server-side response of Retry-After header. Injects the LLO header flag if the size of the exported data > 1 MB. 3. Customize the auto instrumentation to use the new `AwsBatchLogRecordProcessor` *Testing:* TODO: 1. Add unit tests to validate behavior of `AwsBatchLogRecordProcessor` and `OTLPAwsLogExporter` 2. E2E testing to validate any performance hits and compatibility with: #361 By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
1 parent b972d72 commit f1c49fe

File tree

11 files changed

+609
-22
lines changed

11 files changed

+609
-22
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
AGENT_OBSERVABILITY_ENABLED = "AGENT_OBSERVABILITY_ENABLED"
1313

14+
1415
def is_installed(req: str) -> bool:
1516
"""Is the given required package installed?"""
1617

@@ -24,5 +25,6 @@ def is_installed(req: str) -> bool:
2425
return False
2526
return True
2627

28+
2729
def is_agent_observability_enabled() -> bool:
2830
return os.environ.get(AGENT_OBSERVABILITY_ENABLED, "false").lower() == "true"

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/aws_opentelemetry_configurator.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
from typing_extensions import override
1111

1212
from amazon.opentelemetry.distro._aws_attribute_keys import AWS_LOCAL_SERVICE
13-
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
1413
from amazon.opentelemetry.distro._aws_resource_attribute_configurator import get_service_attribute
14+
from amazon.opentelemetry.distro._utils import is_agent_observability_enabled
1515
from amazon.opentelemetry.distro.always_record_sampler import AlwaysRecordSampler
1616
from amazon.opentelemetry.distro.attribute_propagating_span_processor_builder import (
1717
AttributePropagatingSpanProcessorBuilder,
@@ -22,13 +22,14 @@
2222
AwsMetricAttributesSpanExporterBuilder,
2323
)
2424
from amazon.opentelemetry.distro.aws_span_metrics_processor_builder import AwsSpanMetricsProcessorBuilder
25+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.aws_batch_log_record_processor import AwsBatchLogRecordProcessor
2526
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
2627
from amazon.opentelemetry.distro.exporter.otlp.aws.traces.otlp_aws_span_exporter import OTLPAwsSpanExporter
2728
from amazon.opentelemetry.distro.otlp_udp_exporter import OTLPUdpSpanExporter
2829
from amazon.opentelemetry.distro.sampler.aws_xray_remote_sampler import AwsXRayRemoteSampler
2930
from amazon.opentelemetry.distro.scope_based_exporter import ScopeBasedPeriodicExportingMetricReader
3031
from amazon.opentelemetry.distro.scope_based_filtering_view import ScopeBasedRetainingView
31-
from opentelemetry._logs import set_logger_provider, get_logger_provider
32+
from opentelemetry._logs import get_logger_provider, set_logger_provider
3233
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
3334
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter as OTLPHttpOTLPMetricExporter
3435
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
@@ -83,6 +84,7 @@
8384
DEPRECATED_APP_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APP_SIGNALS_EXPORTER_ENDPOINT"
8485
APPLICATION_SIGNALS_EXPORTER_ENDPOINT_CONFIG = "OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT"
8586
METRIC_EXPORT_INTERVAL_CONFIG = "OTEL_METRIC_EXPORT_INTERVAL"
87+
OTEL_LOGS_EXPORTER = "OTEL_LOGS_EXPORTER"
8688
DEFAULT_METRIC_EXPORT_INTERVAL = 60000.0
8789
AWS_LAMBDA_FUNCTION_NAME_CONFIG = "AWS_LAMBDA_FUNCTION_NAME"
8890
AWS_XRAY_DAEMON_ADDRESS_CONFIG = "AWS_XRAY_DAEMON_ADDRESS"
@@ -181,9 +183,9 @@ def _init_logging(
181183
resource: Resource = None,
182184
):
183185

184-
# Provides a default OTLP log exporter when none is specified.
186+
# Provides a default OTLP log exporter when the environment is not set.
185187
# This is the behavior for the logs exporters for other languages.
186-
if not exporters:
188+
if not exporters and os.environ.get(OTEL_LOGS_EXPORTER) is None:
187189
exporters = {"otlp": OTLPLogExporter}
188190

189191
provider = LoggerProvider(resource=resource)
@@ -192,7 +194,11 @@ def _init_logging(
192194
for _, exporter_class in exporters.items():
193195
exporter_args: Dict[str, any] = {}
194196
log_exporter = _customize_logs_exporter(exporter_class(**exporter_args), resource)
195-
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
197+
198+
if isinstance(log_exporter, OTLPAwsLogExporter):
199+
provider.add_log_record_processor(AwsBatchLogRecordProcessor(exporter=log_exporter))
200+
else:
201+
provider.add_log_record_processor(BatchLogRecordProcessor(exporter=log_exporter))
196202

197203
handler = LoggingHandler(level=NOTSET, logger_provider=provider)
198204

@@ -364,12 +370,7 @@ def _customize_span_exporter(span_exporter: SpanExporter, resource: Resource) ->
364370

365371
if isinstance(span_exporter, OTLPSpanExporter):
366372
if is_agent_observability_enabled():
367-
logs_endpoint = os.getenv(OTEL_EXPORTER_OTLP_LOGS_ENDPOINT)
368-
logs_exporter = OTLPAwsLogExporter(endpoint=logs_endpoint)
369-
span_exporter = OTLPAwsSpanExporter(
370-
endpoint=traces_endpoint,
371-
logger_provider=get_logger_provider()
372-
)
373+
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint, logger_provider=get_logger_provider())
373374
else:
374375
span_exporter = OTLPAwsSpanExporter(endpoint=traces_endpoint)
375376

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import logging
2+
from typing import Mapping, Sequence
3+
4+
from amazon.opentelemetry.distro.exporter.otlp.aws.logs.otlp_aws_logs_exporter import OTLPAwsLogExporter
5+
from opentelemetry.sdk._logs import LogData
6+
from opentelemetry.sdk._logs._internal.export import (
7+
_SUPPRESS_INSTRUMENTATION_KEY,
8+
BatchLogExportStrategy,
9+
attach,
10+
detach,
11+
set_value,
12+
)
13+
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
14+
from opentelemetry.util.types import AnyValue
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
BASE_LOG_BUFFER_BYTE_SIZE = 2000
19+
MAX_LOG_REQUEST_BYTE_SIZE = (
20+
1048576 # https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-OTLPEndpoint.html
21+
)
22+
23+
24+
class AwsBatchLogRecordProcessor(BatchLogRecordProcessor):
25+
26+
def __init__(
27+
self,
28+
exporter: OTLPAwsLogExporter,
29+
schedule_delay_millis: float | None = None,
30+
max_export_batch_size: int | None = None,
31+
export_timeout_millis: float | None = None,
32+
max_queue_size: int | None = None,
33+
):
34+
35+
super().__init__(
36+
exporter=exporter,
37+
schedule_delay_millis=schedule_delay_millis,
38+
max_export_batch_size=max_export_batch_size,
39+
export_timeout_millis=export_timeout_millis,
40+
max_queue_size=max_queue_size,
41+
)
42+
43+
# https://github.com/open-telemetry/opentelemetry-python/blob/main/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py#L143
44+
def _export(self, batch_strategy: BatchLogExportStrategy) -> None:
45+
"""
46+
Preserves existing batching behavior but will intermediarly export small log batches if
47+
the size of the data in the batch is at orabove AWS CloudWatch's maximum request size limit of 1 MB.
48+
49+
- Data size of exported batches will ALWAYS be <= 1 MB except for the case below:
50+
- If the data size of an exported batch is ever > 1 MB then the batch size is guaranteed to be 1
51+
"""
52+
53+
with self._export_lock:
54+
iteration = 0
55+
# We could see concurrent export calls from worker and force_flush. We call _should_export_batch
56+
# once the lock is obtained to see if we still need to make the requested export.
57+
while self._should_export_batch(batch_strategy, iteration):
58+
59+
iteration += 1
60+
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
61+
try:
62+
batch_length = min(self._max_export_batch_size, len(self._queue))
63+
batch_data_size = 0
64+
batch = []
65+
66+
for _ in range(batch_length):
67+
68+
log_data = self._queue.pop()
69+
log_size = self._get_size_of_log(log_data)
70+
71+
if batch and (batch_data_size + log_size > MAX_LOG_REQUEST_BYTE_SIZE):
72+
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
73+
if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE:
74+
self._exporter.set_gen_ai_flag()
75+
76+
self._exporter.export(batch)
77+
batch_data_size = 0
78+
batch = []
79+
80+
batch_data_size += log_size
81+
batch.append(log_data)
82+
83+
if batch:
84+
# if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE then len(batch) == 1
85+
if batch_data_size > MAX_LOG_REQUEST_BYTE_SIZE:
86+
self._exporter.set_gen_ai_flag()
87+
88+
self._exporter.export(batch)
89+
except Exception: # pylint: disable=broad-exception-caught
90+
_logger.exception("Exception while exporting logs.")
91+
detach(token)
92+
93+
@staticmethod
94+
def _get_size_of_log(log_data: LogData) -> int:
95+
"""
96+
Estimates the size of a given LogData based on the size of the body + a buffer
97+
amount representing a rough guess of other data present in the log.
98+
"""
99+
size = BASE_LOG_BUFFER_BYTE_SIZE
100+
body = log_data.log_record.body
101+
102+
if body:
103+
size += AwsBatchLogRecordProcessor._get_size_of_any_value(body)
104+
105+
return size
106+
107+
@staticmethod
108+
def _get_size_of_any_value(val: AnyValue, seen=None) -> int:
109+
"""
110+
Recursively calculates the size of an AnyValue type in bytes.
111+
"""
112+
113+
if isinstance(val, (str, bytes)):
114+
return len(val)
115+
116+
if isinstance(val, bool):
117+
return 4 if val else 5
118+
119+
if isinstance(val, (float, int)):
120+
return len(str(val))
121+
122+
return 0

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/exporter/otlp/aws/logs/otlp_aws_logs_exporter.py

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,32 @@
11
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
# SPDX-License-Identifier: Apache-2.0
33

4-
from typing import Dict, Optional
4+
import gzip
5+
import logging
6+
from io import BytesIO
7+
from time import sleep
8+
from typing import Dict, Optional, Sequence
9+
10+
import requests
511

612
from amazon.opentelemetry.distro.exporter.otlp.aws.common.aws_auth_session import AwsAuthSession
13+
from opentelemetry.exporter.otlp.proto.common._log_encoder import encode_logs
714
from opentelemetry.exporter.otlp.proto.http import Compression
8-
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
15+
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter, _create_exp_backoff_generator
16+
from opentelemetry.sdk._logs import (
17+
LogData,
18+
)
19+
from opentelemetry.sdk._logs.export import (
20+
LogExportResult,
21+
)
22+
23+
_logger = logging.getLogger(__name__)
924

1025

1126
class OTLPAwsLogExporter(OTLPLogExporter):
27+
_LARGE_LOG_HEADER = {"x-aws-log-semantics": "otel"}
28+
_RETRY_AFTER_HEADER = "Retry-After" # https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
29+
1230
def __init__(
1331
self,
1432
endpoint: Optional[str] = None,
@@ -18,6 +36,7 @@ def __init__(
1836
headers: Optional[Dict[str, str]] = None,
1937
timeout: Optional[int] = None,
2038
):
39+
self._gen_ai_flag = False
2140
self._aws_region = None
2241

2342
if endpoint:
@@ -34,3 +53,133 @@ def __init__(
3453
compression=Compression.Gzip,
3554
session=AwsAuthSession(aws_region=self._aws_region, service="logs"),
3655
)
56+
57+
# https://github.com/open-telemetry/opentelemetry-python/blob/main/exporter/opentelemetry-exporter-otlp-proto-http/src/opentelemetry/exporter/otlp/proto/http/_log_exporter/__init__.py#L167
58+
def export(self, batch: Sequence[LogData]) -> LogExportResult:
59+
"""
60+
Exports the given batch of OTLP log data.
61+
Behaviors of how this export will work -
62+
63+
1. Always compresses the serialized data into gzip before sending.
64+
65+
2. If self._gen_ai_flag is enabled, the log data is > 1 MB a
66+
and the assumption is that the log is a normalized gen.ai LogEvent.
67+
- inject the 'x-aws-log-semantics' flag into the header.
68+
69+
3. Retry behavior is now the following:
70+
- if the response contains a status code that is retryable and the response contains Retry-After in its
71+
headers, the serialized data will be exported after that set delay
72+
73+
- if the response does not contain that Retry-After header, default back to the current iteration of the
74+
exponential backoff delay
75+
"""
76+
77+
if self._shutdown:
78+
_logger.warning("Exporter already shutdown, ignoring batch")
79+
return LogExportResult.FAILURE
80+
81+
serialized_data = encode_logs(batch).SerializeToString()
82+
83+
gzip_data = BytesIO()
84+
with gzip.GzipFile(fileobj=gzip_data, mode="w") as gzip_stream:
85+
gzip_stream.write(serialized_data)
86+
87+
data = gzip_data.getvalue()
88+
89+
backoff = _create_exp_backoff_generator(max_value=self._MAX_RETRY_TIMEOUT)
90+
91+
while True:
92+
resp = self._send(data)
93+
94+
if resp.ok:
95+
return LogExportResult.SUCCESS
96+
97+
if not self._retryable(resp):
98+
_logger.error(
99+
"Failed to export logs batch code: %s, reason: %s",
100+
resp.status_code,
101+
resp.text,
102+
)
103+
self._gen_ai_flag = False
104+
return LogExportResult.FAILURE
105+
106+
# https://opentelemetry.io/docs/specs/otlp/#otlphttp-throttling
107+
maybe_retry_after = resp.headers.get(self._RETRY_AFTER_HEADER, None)
108+
109+
# Set the next retry delay to the value of the Retry-After response in the headers.
110+
# If Retry-After is not present in the headers, default to the next iteration of the
111+
# exponential backoff strategy.
112+
113+
delay = self._parse_retryable_header(maybe_retry_after)
114+
115+
if delay == -1:
116+
delay = next(backoff, self._MAX_RETRY_TIMEOUT)
117+
118+
if delay == self._MAX_RETRY_TIMEOUT:
119+
_logger.error(
120+
"Transient error %s encountered while exporting logs batch. "
121+
"No Retry-After header found and all backoff retries exhausted. "
122+
"Logs will not be exported.",
123+
resp.reason,
124+
)
125+
self._gen_ai_flag = False
126+
return LogExportResult.FAILURE
127+
128+
_logger.warning(
129+
"Transient error %s encountered while exporting logs batch, retrying in %ss.",
130+
resp.reason,
131+
delay,
132+
)
133+
134+
sleep(delay)
135+
136+
def set_gen_ai_flag(self):
137+
"""
138+
Sets the gen_ai flag to true to signal injecting the LLO flag to the headers of the export request.
139+
"""
140+
self._gen_ai_flag = True
141+
142+
def _send(self, serialized_data: bytes):
143+
try:
144+
return self._session.post(
145+
url=self._endpoint,
146+
headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None,
147+
data=serialized_data,
148+
verify=self._certificate_file,
149+
timeout=self._timeout,
150+
cert=self._client_cert,
151+
)
152+
except ConnectionError:
153+
return self._session.post(
154+
url=self._endpoint,
155+
headers=self._LARGE_LOG_HEADER if self._gen_ai_flag else None,
156+
data=serialized_data,
157+
verify=self._certificate_file,
158+
timeout=self._timeout,
159+
cert=self._client_cert,
160+
)
161+
162+
@staticmethod
163+
def _retryable(resp: requests.Response) -> bool:
164+
"""
165+
Is it a retryable response?
166+
"""
167+
if resp.status_code in (429, 503):
168+
return True
169+
170+
return OTLPLogExporter._retryable(resp)
171+
172+
@staticmethod
173+
def _parse_retryable_header(retry_header: Optional[str]) -> float:
174+
"""
175+
Converts the given retryable header into a delay in seconds, returns -1 if there's no header
176+
or error with the parsing
177+
"""
178+
if not retry_header:
179+
return -1
180+
181+
try:
182+
val = float(retry_header)
183+
return val if val >= 0 else -1
184+
except ValueError:
185+
return -1

0 commit comments

Comments
 (0)