diff --git a/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/build.gradle b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/build.gradle new file mode 100644 index 00000000000..1fa9dfbeb41 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/build.gradle @@ -0,0 +1,32 @@ +muzzle { + pass { + group = "software.amazon.awssdk" + module = "eventbridge" + versions = "[2.7,3)" + assertInverse = true + } +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest', 'test') +addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test') + +dependencies { + compileOnly group: 'software.amazon.awssdk', name: 'eventbridge', version: '2.27.19' + + // Include httpclient instrumentation for testing because it is a dependency for aws-sdk. + testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4') + testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2') + testImplementation 'software.amazon.awssdk:eventbridge:2.27.23' + // SQS and SNS are used to act as the "targets" of the EB bus. + testImplementation 'software.amazon.awssdk:sqs:2.27.23' + testImplementation 'software.amazon.awssdk:sns:2.27.23' + testImplementation 'org.testcontainers:localstack:1.20.1' + + latestDepTestImplementation group: 'software.amazon.awssdk', name: 'eventbridge', version: '+' +} + +tasks.withType(Test).configureEach { + usesService(testcontainersLimit) +} diff --git a/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeClientInstrumentation.java new file mode 100644 index 00000000000..b50acc0ad5c --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeClientInstrumentation.java @@ -0,0 +1,50 @@ +package datadog.trace.instrumentation.aws.v2.eventbridge; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.List; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; + +@AutoService(InstrumenterModule.class) +public final class EventBridgeClientInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + public EventBridgeClientInstrumentation() { + super("eventbridge"); + } + + @Override + public String instrumentedType() { + return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("resolveExecutionInterceptors")), + EventBridgeClientInstrumentation.class.getName() + "$AwsEventBridgeBuilderAdvice"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".EventBridgeInterceptor", packageName + ".TextMapInjectAdapter" + }; + } + + public static class AwsEventBridgeBuilderAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void addHandler(@Advice.Return final List interceptors) { + for (ExecutionInterceptor interceptor : interceptors) { + if (interceptor instanceof EventBridgeInterceptor) { + return; // list already has our interceptor, return to builder + } + } + interceptors.add(new EventBridgeInterceptor()); + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeInterceptor.java b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeInterceptor.java new file mode 100644 index 00000000000..b1be20cccc6 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/EventBridgeInterceptor.java @@ -0,0 +1,121 @@ +package datadog.trace.instrumentation.aws.v2.eventbridge; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig; +import static datadog.trace.core.datastreams.TagsProcessor.BUS_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT; +import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG; +import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; +import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER; + +import datadog.trace.bootstrap.InstanceStore; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.PathwayContext; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttribute; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.services.eventbridge.model.PutEventsRequest; +import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry; + +public class EventBridgeInterceptor implements ExecutionInterceptor { + private static final Logger log = LoggerFactory.getLogger(EventBridgeInterceptor.class); + + public static final ExecutionAttribute SPAN_ATTRIBUTE = + InstanceStore.of(ExecutionAttribute.class) + .putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan")); + + private static final String START_TIME_KEY = "x-datadog-start-time"; + private static final String RESOURCE_NAME_KEY = "x-datadog-resource-name"; + + @Override + public SdkRequest modifyRequest( + Context.ModifyRequest context, ExecutionAttributes executionAttributes) { + if (!(context.request() instanceof PutEventsRequest)) { + return context.request(); + } + + PutEventsRequest request = (PutEventsRequest) context.request(); + List modifiedEntries = new ArrayList<>(request.entries().size()); + long startTime = System.currentTimeMillis(); + + for (PutEventsRequestEntry entry : request.entries()) { + StringBuilder detailBuilder = new StringBuilder(entry.detail().trim()); + if (detailBuilder.length() == 0) { + detailBuilder.append("{}"); + } + if (detailBuilder.charAt(detailBuilder.length() - 1) != '}') { + log.debug( + "Unable to parse detail JSON. Not injecting trace context into EventBridge payload."); + modifiedEntries.add(entry); // Add the original entry without modification + continue; + } + + String traceContext = + getTraceContextToInject(executionAttributes, entry.eventBusName(), startTime); + detailBuilder.setLength(detailBuilder.length() - 1); // Remove the last bracket + if (detailBuilder.length() > 1) { + detailBuilder.append(", "); // Only add a comma if detail is not empty. + } + + detailBuilder + .append("\"") + .append(PathwayContext.DATADOG_KEY) + .append("\": ") + .append(traceContext) + .append('}'); + + String modifiedDetail = detailBuilder.toString(); + PutEventsRequestEntry modifiedEntry = entry.toBuilder().detail(modifiedDetail).build(); + modifiedEntries.add(modifiedEntry); + } + + return request.toBuilder().entries(modifiedEntries).build(); + } + + private String getTraceContextToInject( + ExecutionAttributes executionAttributes, String eventBusName, long startTime) { + final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); + StringBuilder jsonBuilder = new StringBuilder(); + jsonBuilder.append('{'); + + // Inject trace context + propagate().inject(span, jsonBuilder, SETTER); + + if (traceConfig().isDataStreamsEnabled()) { + propagate().injectPathwayContext(span, jsonBuilder, SETTER, getTags(eventBusName)); + } + + // Add bus name and start time + jsonBuilder + .append(" \"") + .append(START_TIME_KEY) + .append("\": \"") + .append(startTime) + .append("\", "); + jsonBuilder + .append(" \"") + .append(RESOURCE_NAME_KEY) + .append("\": \"") + .append(eventBusName) + .append("\""); + + jsonBuilder.append('}'); + return jsonBuilder.toString(); + } + + private LinkedHashMap getTags(String eventBusName) { + LinkedHashMap sortedTags = new LinkedHashMap<>(); + sortedTags.put(DIRECTION_TAG, DIRECTION_OUT); + sortedTags.put(BUS_TAG, eventBusName); + sortedTags.put(TYPE_TAG, "bus"); + + return sortedTags; + } +} diff --git a/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/TextMapInjectAdapter.java new file mode 100644 index 00000000000..dd03362ca55 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/eventbridge/TextMapInjectAdapter.java @@ -0,0 +1,13 @@ +package datadog.trace.instrumentation.aws.v2.eventbridge; + +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; + +public class TextMapInjectAdapter implements AgentPropagation.Setter { + + public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter(); + + @Override + public void set(final StringBuilder builder, final String key, final String value) { + builder.append('"').append(key).append("\":\"").append(value).append("\","); + } +} diff --git a/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/test/groovy/EventBridgeClientTest.groovy b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/test/groovy/EventBridgeClientTest.groovy new file mode 100644 index 00000000000..60b2d63ecb6 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-eventbridge-2.0/src/test/groovy/EventBridgeClientTest.groovy @@ -0,0 +1,498 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.config.GeneralConfig +import groovy.json.JsonSlurper +import org.testcontainers.containers.GenericContainer +import org.testcontainers.utility.DockerImageName +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.eventbridge.EventBridgeAsyncClient +import software.amazon.awssdk.services.eventbridge.EventBridgeClient +import software.amazon.awssdk.services.eventbridge.model.PutEventsRequestEntry +import software.amazon.awssdk.services.eventbridge.model.PutEventsResponse +import software.amazon.awssdk.services.eventbridge.model.Target +import software.amazon.awssdk.services.sns.SnsClient +import software.amazon.awssdk.services.sqs.SqsClient +import software.amazon.awssdk.services.sqs.model.QueueAttributeName +import spock.lang.Shared + +import java.time.Duration +import java.util.concurrent.CompletableFuture + +class EventBridgeClientTest extends AgentTestRunner { + static final LOCALSTACK = new GenericContainer(DockerImageName.parse("localstack/localstack")) + .withExposedPorts(4566) + .withEnv("SERVICES", "sns,sqs,events") + .withReuse(true) + .withStartupTimeout(Duration.ofSeconds(120)) + + @Shared + SnsClient snsClient + @Shared + String testTopicARN + @Shared + String testTopicName + + @Shared + EventBridgeClient eventBridgeClient + @Shared + EventBridgeAsyncClient eventBridgeAsyncClient + @Shared + String testBusARN + @Shared + String testBusName + @Shared + String testRuleName + + @Shared + SqsClient sqsClient + @Shared + String testQueueURL + @Shared + String testQueueARN + + def setupSpec() { + LOCALSTACK.start() + def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566) + + eventBridgeClient = EventBridgeClient.builder() + .endpointOverride(URI.create(endPoint)) + .region(Region.of("us-east-1")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) + .build() + + eventBridgeAsyncClient = EventBridgeAsyncClient.builder() + .endpointOverride(URI.create(endPoint)) + .region(Region.of("us-east-1")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) + .build() + + snsClient = SnsClient.builder() + .endpointOverride(URI.create(endPoint)) + .region(Region.of("us-east-1")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) + .build() + + sqsClient = SqsClient.builder() + .endpointOverride(URI.create(endPoint)) + .region(Region.of("us-east-1")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) + .build() + + // Create SNS topic for EventBridge -> SNS tests + testTopicName = "testtopic" + testTopicARN = snsClient.createTopic { it.name(testTopicName) }.topicArn() + + // Create EventBridge bus + testBusName = "testbus" + testBusARN = eventBridgeClient.createEventBus { it.name(testBusName) }.eventBusArn() + + // Create EventBridge rule + testRuleName = "testrule" + eventBridgeClient.putRule { + it.name(testRuleName) + .eventBusName(testBusName) + .eventPattern("{\"source\": [{\"prefix\": \"com.example\"}]}") + } + + // Create SQS queue for EventBridge -> SQS tests + testQueueURL = sqsClient.createQueue { it.queueName("testqueue") }.queueUrl() + testQueueARN = sqsClient.getQueueAttributes { + it.queueUrl(testQueueURL).attributeNames(QueueAttributeName.QUEUE_ARN) + }.attributes().get(QueueAttributeName.QUEUE_ARN) + + // Set up EventBridge rule targets + eventBridgeClient.putTargets { req -> + req.rule(testRuleName) + .eventBusName(testBusName) + .targets( + Target.builder().id("1").arn(testQueueARN).build(), + Target.builder().id("2").arn(testTopicARN).build() + ) + } + } + + def setup() { + sqsClient.purgeQueue { it.queueUrl(testQueueURL) } + } + + def cleanupSpec() { + LOCALSTACK.stop() + } + + @Override + protected void configurePreAgent() { + super.configurePreAgent() + injectSysConfig(GeneralConfig.SERVICE_NAME, "eventbridge") + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true") + + // test propagation styles + injectSysConfig('dd.trace.propagation.style', 'datadog,b3single,b3multi,haystack,xray,tracecontext') + } + + def "trace details propagated via EventBridge to SQS (sync)"() { + when: + TEST_WRITER.clear() + eventBridgeClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test") + .detail('{"message":"sometext"}') + .eventBusName(testBusARN) + .build() + ) + } + + def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0) + def messageBody = new JsonSlurper().parseText(message.body()) + + then: + assertTraces(1) { + trace(1) { + span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "EventBridge.PutEvents" + spanType DDSpanTypes.HTTP_CLIENT + } + } + } + + def detail = messageBody["detail"] + assert detail instanceof Map + assert detail["message"] == "sometext" + + def traceContext = detail["_datadog"] + assert traceContext["x-datadog-trace-id"] != null + assert traceContext["x-datadog-trace-id"].toString().isNumber() + assert traceContext["x-datadog-parent-id"] != null + assert traceContext["x-datadog-parent-id"].toString().isNumber() + assert traceContext["x-datadog-sampling-priority"] == "1" + assert traceContext["x-datadog-start-time"] != null + assert traceContext["x-datadog-resource-name"] != null + + assert messageBody["source"] == "com.example" + assert messageBody["detail-type"] == "test" + } + + def "trace details propagated via EventBridge to SQS (async)"() { + when: + TEST_WRITER.clear() + CompletableFuture futureResponse = eventBridgeAsyncClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test-async") + .detail('{"message":"async-text"}') + .eventBusName(testBusARN) + .build() + ) + } + futureResponse.get() // Wait for async operation to complete + + def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0) + def messageBody = new JsonSlurper().parseText(message.body()) + + then: + assertTraces(1) { + trace(1) { + span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "EventBridge.PutEvents" + spanType DDSpanTypes.HTTP_CLIENT + } + } + } + + def detail = messageBody["detail"] + assert detail instanceof Map + assert detail["message"] == "async-text" + + def traceContext = detail["_datadog"] + assert traceContext["x-datadog-trace-id"].toString().isNumber() + assert traceContext["x-datadog-parent-id"].toString().isNumber() + assert traceContext["x-datadog-sampling-priority"] == "1" + assert traceContext["x-datadog-start-time"] != null + assert traceContext["x-datadog-resource-name"] != null + + assert messageBody["source"] == "com.example" + assert messageBody["detail-type"] == "test-async" + } + + def "trace details propagated via EventBridge to SNS"() { + when: + TEST_WRITER.clear() + eventBridgeClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test") + .detail('{"message":"sns-test"}') + .eventBusName(testBusARN) + .build() + ) + } + + then: + // Unlike SQS, there's no `receiveMessage()` or similar function for SnsClient, + // so we can't test the detail contents but we can test the span's fields. + assertTraces(1) { + trace(1) { + span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "EventBridge.PutEvents" + spanType DDSpanTypes.HTTP_CLIENT + } + } + } + } + + def "test sending multiple events in a single PutEvents request (sync)"() { + when: + TEST_WRITER.clear() + eventBridgeClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test1") + .detail('{"message":"event1"}') + .eventBusName(testBusARN) + .build(), + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test2") + .detail('{"message":"event2"}') + .eventBusName(testBusARN) + .build() + ) + } + + def messages = sqsClient.receiveMessage { it.queueUrl(testQueueURL).maxNumberOfMessages(2).waitTimeSeconds(5) }.messages() + + then: + assertTraces(1) { + trace(1) { + span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "EventBridge.PutEvents" + spanType DDSpanTypes.HTTP_CLIENT + } + } + } + + assert messages.size() == 2 + messages.every { message -> + def body = new JsonSlurper().parseText(message.body()) + body["detail"]["message"].toString().contains("event") && + body["detail"]["_datadog"] != null && + body["detail"]["_datadog"]["x-datadog-trace-id"] != null && + body["detail"]["_datadog"]["x-datadog-parent-id"] != null + } + } + + def "test sending multiple events in a single PutEvents request (async)"() { + when: + TEST_WRITER.clear() + CompletableFuture futureResponse = eventBridgeAsyncClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test1-async") + .detail('{"message":"event1-async"}') + .eventBusName(testBusARN) + .build(), + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test2-async") + .detail('{"message":"event2-async"}') + .eventBusName(testBusARN) + .build() + ) + } + futureResponse.get() // Wait for async operation to complete + + def messages = sqsClient.receiveMessage { it.queueUrl(testQueueURL).maxNumberOfMessages(2).waitTimeSeconds(5) }.messages() + + then: + assertTraces(1) { + trace(1) { + span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "EventBridge.PutEvents" + spanType DDSpanTypes.HTTP_CLIENT + } + } + } + + assert messages.size() == 2 + messages.every { message -> + def body = new JsonSlurper().parseText(message.body()) + body["detail"]["message"].toString().contains("event") && + body["detail"]["_datadog"] != null && + body["detail"]["_datadog"]["x-datadog-trace-id"] != null && + body["detail"]["_datadog"]["x-datadog-parent-id"] != null + } + } + + def "test with nested details"() { + when: + TEST_WRITER.clear() + eventBridgeClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test") + .detail('{"nested":{"nested_again":{"key1":"value1","key2":42}},"array":[1,2,3]}') + .eventBusName(testBusARN) + .build() + ) + } + + def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0) + def messageBody = new JsonSlurper().parseText(message.body()) + + then: + assertTraces(1) { + trace(1) { + span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "EventBridge.PutEvents" + spanType DDSpanTypes.HTTP_CLIENT + } + } + } + + def detail = messageBody["detail"] + assert detail["nested"]["nested_again"]["key1"] == "value1" + assert detail["nested"]["nested_again"]["key2"] == 42 + assert detail["array"] == [1, 2, 3] + assert detail["_datadog"] != null + assert detail["_datadog"]["x-datadog-start-time"] != null + assert detail["_datadog"]["x-datadog-resource-name"] != null + } + + def "test behavior when data streams are disabled"() { + setup: + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "false") + + when: + TEST_WRITER.clear() + eventBridgeClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test") + .detail('{"message":"data streams disabled"}') + .eventBusName(testBusARN) + .build() + ) + } + + def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0) + def messageBody = new JsonSlurper().parseText(message.body()) + + then: + assertTraces(1) { + trace(1) { + span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "EventBridge.PutEvents" + spanType DDSpanTypes.HTTP_CLIENT + } + } + } + + assert messageBody["detail"]["message"] == "data streams disabled" + assert messageBody["detail"]["_datadog"]["x-datadog-trace-id"] != null + assert messageBody["detail"]["_datadog"]["x-datadog-parent-id"] != null + assert messageBody["detail"]["_datadog"]["x-datadog-tags"] != null + + cleanup: + injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, "true") + } + + def "test behavior with empty detail fields"() { + when: + TEST_WRITER.clear() + eventBridgeClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test-empty") + .detail('{}') + .eventBusName(testBusARN) + .build(), + ) + } + + def messages = sqsClient.receiveMessage { it.queueUrl(testQueueURL).maxNumberOfMessages(2).waitTimeSeconds(5) }.messages() + + then: + assertTraces(1) { + trace(1) { + span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "EventBridge.PutEvents" + spanType DDSpanTypes.HTTP_CLIENT + } + } + } + assert messages.size() == 1 + + def message = messages[0] + assert message != null + def emptyDetailBody = new JsonSlurper().parseText(message.body()) + assert emptyDetailBody["detail"]["_datadog"] != null // Datadog context should be injected + assert emptyDetailBody["detail"]["_datadog"]["x-datadog-trace-id"] != null + assert emptyDetailBody["detail"]["_datadog"]["x-datadog-parent-id"] != null + assert emptyDetailBody["detail"]["_datadog"]["x-datadog-start-time"] != null + assert emptyDetailBody["detail"]["_datadog"]["x-datadog-resource-name"] != null + } + + def "test propagation styles"() { + when: + eventBridgeClient.putEvents { req -> + req.entries( + PutEventsRequestEntry.builder() + .source("com.example") + .detailType("test") + .detail('{"foo":"bar"}') + .eventBusName(testBusARN) + .build() + ) + } + + def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0) + def messageBody = new JsonSlurper().parseText(message.body()) + def traceContext = messageBody["detail"]["_datadog"] + + then: + expectedHeaders.each { header -> + assert traceContext[header] != null + } + + where: + expectedHeaders = [ + 'x-datadog-trace-id', + 'x-datadog-parent-id', + 'x-datadog-sampling-priority', + 'b3', + 'X-B3-TraceId', + 'X-B3-SpanId', + 'Span-ID', + 'Parent-ID', + 'X-Amzn-Trace-Id', + 'traceparent', + 'tracestate' + ] + } +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java index c3e87028521..a8a8ee0785e 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java @@ -8,7 +8,6 @@ import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG; import static datadog.trace.instrumentation.aws.v2.sns.TextMapInjectAdapter.SETTER; -import datadog.trace.api.TracePropagationStyle; import datadog.trace.bootstrap.InstanceStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import java.nio.charset.StandardCharsets; @@ -38,7 +37,7 @@ private SdkBytes getMessageAttributeValueToInject( final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); StringBuilder jsonBuilder = new StringBuilder(); jsonBuilder.append("{"); - propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG); + propagate().inject(span, jsonBuilder, SETTER); if (traceConfig().isDataStreamsEnabled()) { propagate().injectPathwayContext(span, jsonBuilder, SETTER, getTags(snsTopicName)); } diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy index 12249bf2d7d..30f87bb1b7b 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy @@ -65,6 +65,9 @@ abstract class SnsClientTest extends VersionedNamingTestBase { // Set a service name that gets sorted early with SORT_BY_NAMES injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service") injectSysConfig(GeneralConfig.DATA_STREAMS_ENABLED, isDataStreamsEnabled().toString()) + + // test propagation styles + injectSysConfig('dd.trace.propagation.style', 'datadog,b3single,b3multi,xray,tracecontext') } @Override @@ -188,6 +191,41 @@ abstract class SnsClientTest extends VersionedNamingTestBase { then: noExceptionThrown() } + + def "test propagation styles"() { + when: + TEST_WRITER.clear() + snsClient.publish { req -> + req.message("test message") + .topicArn(testTopicARN) + } + + def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0) + def messageBody = new JsonSlurper().parseText(message.body()) + + String base64EncodedString = messageBody["MessageAttributes"]["_datadog"]["Value"] + byte[] decodedBytes = base64EncodedString.decodeBase64() + String decodedString = new String(decodedBytes, "UTF-8") + def traceContext = new JsonSlurper().parseText(decodedString) + + then: + expectedHeaders.each { header -> + assert traceContext[header] != null, "Header $header is missing" + } + + where: + expectedHeaders = [ + 'x-datadog-trace-id', + 'x-datadog-parent-id', + 'x-datadog-sampling-priority', + 'b3', + 'X-B3-TraceId', + 'X-B3-SpanId', + 'X-Amzn-Trace-Id', + 'traceparent', + 'tracestate' + ] + } } class SnsClientV0Test extends SnsClientTest { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java index 6b679ba704d..1838b47239b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/TagsProcessor.java @@ -33,9 +33,14 @@ public String apply(String key) { private static final DDCache DIRECTION_TAG_CACHE = DDCaches.newFixedSizeCache(32); private static final Function DIRECTION_TAG_PREFIX = new StringPrefix("direction:"); + // SNS Topic public static final String TOPIC_TAG = "topic"; private static final DDCache TOPIC_TAG_CACHE = DDCaches.newFixedSizeCache(32); private static final Function TOPIC_TAG_PREFIX = new StringPrefix("topic:"); + // EventBridge Bus + public static final String BUS_TAG = "bus"; + private static final DDCache BUS_TAG_CACHE = DDCaches.newFixedSizeCache(32); + private static final Function BUS_TAG_PREFIX = new StringPrefix("bus:"); public static final String PARTITION_TAG = "partition"; private static final DDCache PARTITION_TAG_CACHE = DDCaches.newFixedSizeCache(32); @@ -90,6 +95,7 @@ private static Map> createTagToCacheMap() { result.put(TYPE_TAG, TYPE_TAG_CACHE); result.put(DIRECTION_TAG, DIRECTION_TAG_CACHE); result.put(TOPIC_TAG, TOPIC_TAG_CACHE); + result.put(BUS_TAG, BUS_TAG_CACHE); result.put(PARTITION_TAG, PARTITION_TAG_CACHE); result.put(GROUP_TAG, GROUP_TAG_CACHE); result.put(CONSUMER_GROUP_TAG, CONSUMER_GROUP_TAG_CACHE); @@ -107,6 +113,7 @@ private static Map> createTagToPrefixMap() { result.put(TYPE_TAG, TYPE_TAG_PREFIX); result.put(DIRECTION_TAG, DIRECTION_TAG_PREFIX); result.put(TOPIC_TAG, TOPIC_TAG_PREFIX); + result.put(BUS_TAG, BUS_TAG_PREFIX); result.put(PARTITION_TAG, PARTITION_TAG_PREFIX); result.put(GROUP_TAG, GROUP_TAG_PREFIX); result.put(CONSUMER_GROUP_TAG, CONSUMER_GROUP_TAG_PREFIX); diff --git a/settings.gradle b/settings.gradle index 418c5143502..3052776b82f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -180,6 +180,7 @@ include ':dd-java-agent:instrumentation:apache-httpcore-4' include ':dd-java-agent:instrumentation:armeria-grpc' include ':dd-java-agent:instrumentation:armeria-jetty' include ':dd-java-agent:instrumentation:avro' +include ':dd-java-agent:instrumentation:aws-java-eventbridge-2.0' include ':dd-java-agent:instrumentation:aws-java-sdk-1.11.0' include ':dd-java-agent:instrumentation:aws-java-sdk-2.2' include ':dd-java-agent:instrumentation:aws-java-sns-1.0'