Skip to content

Commit de4b61b

Browse files
authored
feat: Extract resource arn and remote resource access key for cross-account support (#396)
**Description of changes:** 1. Add auto-instrumentation support for the following AWS resources. - Populate `aws.stream.arn` in Span by extracting StreamArn from the request body. - Populate `aws.table.arn` in Span by extracting TableArn from the response body. 2. Add auto-instrumentation support for remote resource access key. - Populate `aws.auth.account.access_key` and `aws.auth.region` in Span from STS credentials in client instance 3. Generate cross-account metrics attributes when remote resource identifier is present - If remote resource arn is available, extract account id and region from arn. - Otherwise, pass account access key id and region from span to metric if available. 4. Add unit tests, contract tests. Done E2E tests with CW agent. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.
2 parents c6d8143 + 5bc41ee commit de4b61b

File tree

13 files changed

+1108
-111
lines changed

13 files changed

+1108
-111
lines changed

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_attribute_keys.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
# AWS_#_NAME attributes are not supported in python as they are not part of the Semantic Conventions.
2020
# TODO:Move to Semantic Conventions when these attributes are added.
21+
AWS_AUTH_ACCESS_KEY: str = "aws.auth.account.access_key"
22+
AWS_AUTH_REGION: str = "aws.auth.region"
2123
AWS_SQS_QUEUE_URL: str = "aws.sqs.queue.url"
2224
AWS_SQS_QUEUE_NAME: str = "aws.sqs.queue.name"
25+
AWS_KINESIS_STREAM_ARN: str = "aws.kinesis.stream.arn"
2326
AWS_KINESIS_STREAM_NAME: str = "aws.kinesis.stream.name"
2427
AWS_BEDROCK_DATA_SOURCE_ID: str = "aws.bedrock.data_source.id"
2528
AWS_BEDROCK_KNOWLEDGE_BASE_ID: str = "aws.bedrock.knowledge_base.id"
@@ -33,4 +36,8 @@
3336
AWS_LAMBDA_FUNCTION_NAME: str = "aws.lambda.function.name"
3437
AWS_LAMBDA_RESOURCEMAPPING_ID: str = "aws.lambda.resource_mapping.id"
3538
AWS_LAMBDA_FUNCTION_ARN: str = "aws.lambda.function.arn"
39+
AWS_DYNAMODB_TABLE_ARN: str = "aws.dynamodb.table.arn"
40+
AWS_REMOTE_RESOURCE_ACCESS_KEY: str = "aws.remote.resource.account.access_key"
41+
AWS_REMOTE_RESOURCE_ACCOUNT_ID: str = "aws.remote.resource.account.id"
42+
AWS_REMOTE_RESOURCE_REGION: str = "aws.remote.resource.region"
3643
AWS_SERVICE_TYPE: str = "aws.service.type"

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_aws_metric_attribute_generator.py

Lines changed: 87 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,16 @@
77
from urllib.parse import ParseResult, urlparse
88

99
from amazon.opentelemetry.distro._aws_attribute_keys import (
10+
AWS_AUTH_ACCESS_KEY,
11+
AWS_AUTH_REGION,
1012
AWS_BEDROCK_AGENT_ID,
1113
AWS_BEDROCK_DATA_SOURCE_ID,
1214
AWS_BEDROCK_GUARDRAIL_ARN,
1315
AWS_BEDROCK_GUARDRAIL_ID,
1416
AWS_BEDROCK_KNOWLEDGE_BASE_ID,
1517
AWS_CLOUDFORMATION_PRIMARY_IDENTIFIER,
18+
AWS_DYNAMODB_TABLE_ARN,
19+
AWS_KINESIS_STREAM_ARN,
1620
AWS_KINESIS_STREAM_NAME,
1721
AWS_LAMBDA_FUNCTION_ARN,
1822
AWS_LAMBDA_FUNCTION_NAME,
@@ -22,7 +26,10 @@
2226
AWS_REMOTE_DB_USER,
2327
AWS_REMOTE_ENVIRONMENT,
2428
AWS_REMOTE_OPERATION,
29+
AWS_REMOTE_RESOURCE_ACCESS_KEY,
30+
AWS_REMOTE_RESOURCE_ACCOUNT_ID,
2531
AWS_REMOTE_RESOURCE_IDENTIFIER,
32+
AWS_REMOTE_RESOURCE_REGION,
2633
AWS_REMOTE_RESOURCE_TYPE,
2734
AWS_REMOTE_SERVICE,
2835
AWS_SECRETSMANAGER_SECRET_ARN,
@@ -56,6 +63,7 @@
5663
SERVICE_METRIC,
5764
MetricAttributeGenerator,
5865
)
66+
from amazon.opentelemetry.distro.regional_resource_arn_parser import RegionalResourceArnParser
5967
from amazon.opentelemetry.distro.sqs_url_parser import SqsUrlParser
6068
from opentelemetry.sdk.resources import Resource
6169
from opentelemetry.sdk.trace import BoundedAttributes, ReadableSpan
@@ -148,7 +156,11 @@ def _generate_dependency_metric_attributes(span: ReadableSpan, resource: Resourc
148156
_set_service(resource, span, attributes)
149157
_set_egress_operation(span, attributes)
150158
_set_remote_service_and_operation(span, attributes)
151-
_set_remote_type_and_identifier(span, attributes)
159+
is_remote_identifier_present = _set_remote_type_and_identifier(span, attributes)
160+
if is_remote_identifier_present:
161+
is_remote_account_id_present = _set_remote_account_id_and_region(span, attributes)
162+
if not is_remote_account_id_present:
163+
_set_remote_access_key_and_region(span, attributes)
152164
_set_remote_environment(span, attributes)
153165
_set_remote_db_user(span, attributes)
154166
_set_span_kind_for_dependency(span, attributes)
@@ -383,7 +395,7 @@ def _generate_remote_operation(span: ReadableSpan) -> str:
383395

384396

385397
# pylint: disable=too-many-branches,too-many-statements
386-
def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttributes) -> None:
398+
def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttributes) -> bool:
387399
"""
388400
Remote resource attributes {@link AwsAttributeKeys#AWS_REMOTE_RESOURCE_TYPE} and {@link
389401
AwsAttributeKeys#AWS_REMOTE_RESOURCE_IDENTIFIER} are used to store information about the resource associated with
@@ -403,9 +415,23 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
403415
if is_key_present(span, _AWS_TABLE_NAMES) and len(span.attributes.get(_AWS_TABLE_NAMES)) == 1:
404416
remote_resource_type = _NORMALIZED_DYNAMO_DB_SERVICE_NAME + "::Table"
405417
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_TABLE_NAMES)[0])
418+
elif is_key_present(span, AWS_DYNAMODB_TABLE_ARN):
419+
remote_resource_type = _NORMALIZED_DYNAMO_DB_SERVICE_NAME + "::Table"
420+
remote_resource_identifier = _escape_delimiters(
421+
RegionalResourceArnParser.extract_dynamodb_table_name_from_arn(
422+
span.attributes.get(AWS_DYNAMODB_TABLE_ARN)
423+
)
424+
)
406425
elif is_key_present(span, AWS_KINESIS_STREAM_NAME):
407426
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
408427
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_KINESIS_STREAM_NAME))
428+
elif is_key_present(span, AWS_KINESIS_STREAM_ARN):
429+
remote_resource_type = _NORMALIZED_KINESIS_SERVICE_NAME + "::Stream"
430+
remote_resource_identifier = _escape_delimiters(
431+
RegionalResourceArnParser.extract_kinesis_stream_name_from_arn(
432+
span.attributes.get(AWS_KINESIS_STREAM_ARN)
433+
)
434+
)
409435
elif is_key_present(span, _AWS_BUCKET_NAME):
410436
remote_resource_type = _NORMALIZED_S3_SERVICE_NAME + "::Bucket"
411437
remote_resource_identifier = _escape_delimiters(span.attributes.get(_AWS_BUCKET_NAME))
@@ -442,27 +468,35 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
442468
remote_resource_identifier = _escape_delimiters(span.attributes.get(GEN_AI_REQUEST_MODEL))
443469
elif is_key_present(span, AWS_SECRETSMANAGER_SECRET_ARN):
444470
remote_resource_type = _NORMALIZED_SECRETSMANAGER_SERVICE_NAME + "::Secret"
445-
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_SECRETSMANAGER_SECRET_ARN)).split(
446-
":"
447-
)[-1]
471+
remote_resource_identifier = _escape_delimiters(
472+
RegionalResourceArnParser.extract_resource_name_from_arn(
473+
span.attributes.get(AWS_SECRETSMANAGER_SECRET_ARN)
474+
)
475+
)
448476
cloudformation_primary_identifier = _escape_delimiters(span.attributes.get(AWS_SECRETSMANAGER_SECRET_ARN))
449477
elif is_key_present(span, AWS_SNS_TOPIC_ARN):
450478
remote_resource_type = _NORMALIZED_SNS_SERVICE_NAME + "::Topic"
451-
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_SNS_TOPIC_ARN)).split(":")[-1]
479+
remote_resource_identifier = _escape_delimiters(
480+
RegionalResourceArnParser.extract_resource_name_from_arn(span.attributes.get(AWS_SNS_TOPIC_ARN))
481+
)
452482
cloudformation_primary_identifier = _escape_delimiters(span.attributes.get(AWS_SNS_TOPIC_ARN))
453483
elif is_key_present(span, AWS_STEPFUNCTIONS_STATEMACHINE_ARN):
454484
remote_resource_type = _NORMALIZED_STEPFUNCTIONS_SERVICE_NAME + "::StateMachine"
455485
remote_resource_identifier = _escape_delimiters(
456-
span.attributes.get(AWS_STEPFUNCTIONS_STATEMACHINE_ARN)
457-
).split(":")[-1]
486+
RegionalResourceArnParser.extract_resource_name_from_arn(
487+
span.attributes.get(AWS_STEPFUNCTIONS_STATEMACHINE_ARN)
488+
)
489+
)
458490
cloudformation_primary_identifier = _escape_delimiters(
459491
span.attributes.get(AWS_STEPFUNCTIONS_STATEMACHINE_ARN)
460492
)
461493
elif is_key_present(span, AWS_STEPFUNCTIONS_ACTIVITY_ARN):
462494
remote_resource_type = _NORMALIZED_STEPFUNCTIONS_SERVICE_NAME + "::Activity"
463-
remote_resource_identifier = _escape_delimiters(span.attributes.get(AWS_STEPFUNCTIONS_ACTIVITY_ARN)).split(
464-
":"
465-
)[-1]
495+
remote_resource_identifier = _escape_delimiters(
496+
RegionalResourceArnParser.extract_resource_name_from_arn(
497+
span.attributes.get(AWS_STEPFUNCTIONS_ACTIVITY_ARN)
498+
)
499+
)
466500
cloudformation_primary_identifier = _escape_delimiters(span.attributes.get(AWS_STEPFUNCTIONS_ACTIVITY_ARN))
467501
elif is_key_present(span, AWS_LAMBDA_FUNCTION_NAME):
468502
# For non-Invoke Lambda operations, treat Lambda as a resource,
@@ -491,6 +525,48 @@ def _set_remote_type_and_identifier(span: ReadableSpan, attributes: BoundedAttri
491525
attributes[AWS_REMOTE_RESOURCE_TYPE] = remote_resource_type
492526
attributes[AWS_REMOTE_RESOURCE_IDENTIFIER] = remote_resource_identifier
493527
attributes[AWS_CLOUDFORMATION_PRIMARY_IDENTIFIER] = cloudformation_primary_identifier
528+
return True
529+
return False
530+
531+
532+
def _set_remote_account_id_and_region(span: ReadableSpan, attributes: BoundedAttributes) -> bool:
533+
arn_attributes = [
534+
AWS_DYNAMODB_TABLE_ARN,
535+
AWS_KINESIS_STREAM_ARN,
536+
AWS_SNS_TOPIC_ARN,
537+
AWS_SECRETSMANAGER_SECRET_ARN,
538+
AWS_STEPFUNCTIONS_STATEMACHINE_ARN,
539+
AWS_STEPFUNCTIONS_ACTIVITY_ARN,
540+
AWS_BEDROCK_GUARDRAIL_ARN,
541+
AWS_LAMBDA_FUNCTION_ARN,
542+
]
543+
remote_account_id: Optional[str] = None
544+
remote_region: Optional[str] = None
545+
546+
if is_key_present(span, AWS_SQS_QUEUE_URL):
547+
queue_url = _escape_delimiters(span.attributes.get(AWS_SQS_QUEUE_URL))
548+
remote_account_id = SqsUrlParser.get_account_id(queue_url)
549+
remote_region = SqsUrlParser.get_region(queue_url)
550+
else:
551+
for arn_attribute in arn_attributes:
552+
if is_key_present(span, arn_attribute):
553+
arn = span.attributes.get(arn_attribute)
554+
remote_account_id = RegionalResourceArnParser.get_account_id(arn)
555+
remote_region = RegionalResourceArnParser.get_region(arn)
556+
break
557+
558+
if remote_account_id is not None and remote_region is not None:
559+
attributes[AWS_REMOTE_RESOURCE_ACCOUNT_ID] = remote_account_id
560+
attributes[AWS_REMOTE_RESOURCE_REGION] = remote_region
561+
return True
562+
return False
563+
564+
565+
def _set_remote_access_key_and_region(span: ReadableSpan, attributes: BoundedAttributes) -> None:
566+
if is_key_present(span, AWS_AUTH_ACCESS_KEY):
567+
attributes[AWS_REMOTE_RESOURCE_ACCESS_KEY] = span.attributes.get(AWS_AUTH_ACCESS_KEY)
568+
if is_key_present(span, AWS_AUTH_REGION):
569+
attributes[AWS_REMOTE_RESOURCE_REGION] = span.attributes.get(AWS_AUTH_REGION)
494570

495571

496572
def _set_remote_environment(span: ReadableSpan, attributes: BoundedAttributes) -> None:

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/_utils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,7 @@ def get_aws_region() -> Optional[str]:
7575
"""
7676
botocore_session = get_aws_session()
7777
return botocore_session.get_config_variable("region") if botocore_session else None
78+
79+
80+
def is_account_id(input_str: str) -> bool:
81+
return input_str is not None and input_str.isdigit()

aws-opentelemetry-distro/src/amazon/opentelemetry/distro/patches/_botocore_patches.py

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@
33
# Modifications Copyright The OpenTelemetry Authors. Licensed under the Apache License 2.0 License.
44
import importlib
55

6+
from botocore.exceptions import ClientError
7+
68
from amazon.opentelemetry.distro._aws_attribute_keys import (
9+
AWS_AUTH_ACCESS_KEY,
10+
AWS_AUTH_REGION,
11+
AWS_DYNAMODB_TABLE_ARN,
12+
AWS_KINESIS_STREAM_ARN,
713
AWS_KINESIS_STREAM_NAME,
814
AWS_LAMBDA_FUNCTION_ARN,
915
AWS_LAMBDA_FUNCTION_NAME,
@@ -20,7 +26,14 @@
2026
_BedrockAgentRuntimeExtension,
2127
_BedrockExtension,
2228
)
23-
from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS
29+
from opentelemetry.instrumentation.botocore import (
30+
BotocoreInstrumentor,
31+
_apply_response_attributes,
32+
_determine_call_context,
33+
_safe_invoke,
34+
)
35+
from opentelemetry.instrumentation.botocore.extensions import _KNOWN_EXTENSIONS, _find_extension
36+
from opentelemetry.instrumentation.botocore.extensions.dynamodb import _DynamoDbExtension
2437
from opentelemetry.instrumentation.botocore.extensions.lmbd import _LambdaExtension
2538
from opentelemetry.instrumentation.botocore.extensions.sns import _SnsExtension
2639
from opentelemetry.instrumentation.botocore.extensions.sqs import _SqsExtension
@@ -30,6 +43,8 @@
3043
_BotocoreInstrumentorContext,
3144
_BotoResultT,
3245
)
46+
from opentelemetry.instrumentation.botocore.utils import get_server_attributes
47+
from opentelemetry.instrumentation.utils import is_instrumentation_enabled, suppress_http_instrumentation
3348
from opentelemetry.semconv.trace import SpanAttributes
3449
from opentelemetry.trace.span import Span
3550

@@ -39,6 +54,7 @@ def _apply_botocore_instrumentation_patches() -> None:
3954
4055
Adds patches to provide additional support and Java parity for Kinesis, S3, and SQS.
4156
"""
57+
_apply_botocore_api_call_patch()
4258
_apply_botocore_kinesis_patch()
4359
_apply_botocore_s3_patch()
4460
_apply_botocore_sqs_patch()
@@ -47,6 +63,7 @@ def _apply_botocore_instrumentation_patches() -> None:
4763
_apply_botocore_sns_patch()
4864
_apply_botocore_stepfunctions_patch()
4965
_apply_botocore_lambda_patch()
66+
_apply_botocore_dynamodb_patch()
5067

5168

5269
def _apply_botocore_lambda_patch() -> None:
@@ -208,6 +225,115 @@ def _apply_botocore_bedrock_patch() -> None:
208225
# bedrock-runtime is handled by upstream
209226

210227

228+
def _apply_botocore_dynamodb_patch() -> None:
229+
"""Botocore instrumentation patch for DynamoDB
230+
231+
This patch adds an extension to the upstream's list of known extensions for DynamoDB.
232+
Extensions allow for custom logic for adding service-specific information to
233+
spans, such as attributes. Specifically, we are adding logic to add the
234+
`aws.table.arn` attribute, to be used to generate RemoteTarget and achieve
235+
parity with the Java instrumentation.
236+
"""
237+
old_on_success = _DynamoDbExtension.on_success
238+
239+
def patch_on_success(self, span: Span, result: _BotoResultT, instrumentor_context: _BotocoreInstrumentorContext):
240+
old_on_success(self, span, result, instrumentor_context)
241+
table = result.get("Table", {})
242+
table_arn = table.get("TableArn")
243+
if table_arn:
244+
span.set_attribute(AWS_DYNAMODB_TABLE_ARN, table_arn)
245+
246+
_DynamoDbExtension.on_success = patch_on_success
247+
248+
249+
def _apply_botocore_api_call_patch() -> None:
250+
# pylint: disable=too-many-locals
251+
def patched_api_call(self, original_func, instance, args, kwargs):
252+
"""Botocore instrumentation patch to capture AWS authentication details
253+
254+
This patch extends the upstream implementation to include additional AWS authentication
255+
attributes:
256+
- aws.auth.account.access_key
257+
- aws.auth.region
258+
259+
Note: Current implementation duplicates upstream code in v1.33.x-0.54bx. Future improvements should:
260+
1. Propose refactoring upstream _patched_api_call into smaller components
261+
2. Apply targeted patches to these components to reduce code duplication
262+
263+
Reference: https://github.com/open-telemetry/opentelemetry-python-contrib/blob/
264+
release/v1.33.x-0.54bx/instrumentation/opentelemetry-instrumentation-botocore/src/
265+
opentelemetry/instrumentation/botocore/__init__.py#L263
266+
"""
267+
if not is_instrumentation_enabled():
268+
return original_func(*args, **kwargs)
269+
270+
call_context = _determine_call_context(instance, args)
271+
if call_context is None:
272+
return original_func(*args, **kwargs)
273+
274+
extension = _find_extension(call_context)
275+
if not extension.should_trace_service_call():
276+
return original_func(*args, **kwargs)
277+
278+
attributes = {
279+
SpanAttributes.RPC_SYSTEM: "aws-api",
280+
SpanAttributes.RPC_SERVICE: call_context.service_id,
281+
SpanAttributes.RPC_METHOD: call_context.operation,
282+
# TODO: update when semantic conventions exist
283+
"aws.region": call_context.region,
284+
**get_server_attributes(call_context.endpoint_url),
285+
AWS_AUTH_REGION: call_context.region,
286+
}
287+
288+
credentials = instance._get_credentials()
289+
if credentials is not None:
290+
access_key = credentials.access_key
291+
if access_key is not None:
292+
attributes[AWS_AUTH_ACCESS_KEY] = access_key
293+
294+
_safe_invoke(extension.extract_attributes, attributes)
295+
end_span_on_exit = extension.should_end_span_on_exit()
296+
297+
tracer = self._get_tracer(extension)
298+
event_logger = self._get_event_logger(extension)
299+
meter = self._get_meter(extension)
300+
metrics = self._get_metrics(extension, meter)
301+
instrumentor_ctx = _BotocoreInstrumentorContext(
302+
event_logger=event_logger,
303+
metrics=metrics,
304+
)
305+
with tracer.start_as_current_span(
306+
call_context.span_name,
307+
kind=call_context.span_kind,
308+
attributes=attributes,
309+
# tracing streaming services require to close the span manually
310+
# at a later time after the stream has been consumed
311+
end_on_exit=end_span_on_exit,
312+
) as span:
313+
_safe_invoke(extension.before_service_call, span, instrumentor_ctx)
314+
self._call_request_hook(span, call_context)
315+
316+
try:
317+
with suppress_http_instrumentation():
318+
result = None
319+
try:
320+
result = original_func(*args, **kwargs)
321+
except ClientError as error:
322+
result = getattr(error, "response", None)
323+
_apply_response_attributes(span, result)
324+
_safe_invoke(extension.on_error, span, error, instrumentor_ctx)
325+
raise
326+
_apply_response_attributes(span, result)
327+
_safe_invoke(extension.on_success, span, result, instrumentor_ctx)
328+
finally:
329+
_safe_invoke(extension.after_service_call, instrumentor_ctx)
330+
self._call_response_hook(span, call_context, result)
331+
332+
return result
333+
334+
BotocoreInstrumentor._patched_api_call = patched_api_call
335+
336+
211337
# The OpenTelemetry Authors code
212338
def _lazy_load(module, cls):
213339
"""Clone of upstream opentelemetry.instrumentation.botocore.extensions.lazy_load
@@ -265,3 +391,6 @@ def extract_attributes(self, attributes: _AttributeMapT):
265391
stream_name = self._call_context.params.get("StreamName")
266392
if stream_name:
267393
attributes[AWS_KINESIS_STREAM_NAME] = stream_name
394+
stream_arn = self._call_context.params.get("StreamARN")
395+
if stream_arn:
396+
attributes[AWS_KINESIS_STREAM_ARN] = stream_arn

0 commit comments

Comments
 (0)