Skip to content

Commit

Permalink
feat(dsm): Migrate DSM injection call to propagator API 1/3
Browse files Browse the repository at this point in the history
  • Loading branch information
PerfectSlayer committed Feb 3, 2025
1 parent 778cadd commit 3f7c076
Show file tree
Hide file tree
Showing 35 changed files with 192 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.AKKA_CLIENT_REQUEST;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientHelpers.AkkaHttpHeaders;
Expand All @@ -17,9 +17,9 @@
import datadog.context.propagation.Propagators;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import net.bytebuddy.asm.Advice;
import scala.concurrent.Future;

Expand Down Expand Up @@ -79,10 +79,8 @@ public static AgentScope methodEnter(
DECORATE.onRequest(span, request);

if (request != null) {
Propagators.defaultPropagator().inject(span, request, headers);
propagate()
.injectPathwayContext(
span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
Propagators.defaultPropagator().inject(span.with(dsmContext), request, headers);
// Request is immutable, so we have to assign new value once we update headers
request = headers.getRequest();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package datadog.trace.instrumentation.akkahttp106;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;

import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import net.bytebuddy.asm.Advice;
import scala.concurrent.Future;

Expand All @@ -29,10 +29,8 @@ public static AgentScope methodEnter(
AkkaHttpClientDecorator.DECORATE.onRequest(span, request);

if (request != null) {
Propagators.defaultPropagator().inject(span, request, headers);
propagate()
.injectPathwayContext(
span, request, headers, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
Propagators.defaultPropagator().inject(span.with(dsmContext), request, headers);
// Request is immutable, so we have to assign new value once we update headers
request = headers.getRequest();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package datadog.trace.instrumentation.apachehttpasyncclient;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.apachehttpasyncclient.ApacheHttpAsyncClientDecorator.DECORATE;
import static datadog.trace.instrumentation.apachehttpasyncclient.HttpHeadersInjectAdapter.SETTER;

import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import java.io.IOException;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
Expand Down Expand Up @@ -35,9 +35,8 @@ public HttpRequest generateRequest() throws IOException, HttpException {
final HttpRequest request = delegate.generateRequest();
DECORATE.onRequest(span, new HostAndRequestAsHttpUriRequest(delegate.getTarget(), request));

Propagators.defaultPropagator().inject(span, request, SETTER);
propagate()
.injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
Propagators.defaultPropagator().inject(span.with(dsmContext), request, SETTER);

return request;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package datadog.trace.instrumentation.apachehttpclient;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.apachehttpclient.ApacheHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.apachehttpclient.ApacheHttpClientDecorator.HTTP_REQUEST;
import static datadog.trace.instrumentation.apachehttpclient.HttpHeadersInjectAdapter.SETTER;

import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
Expand Down Expand Up @@ -46,10 +46,8 @@ private static AgentScope activateHttpSpan(final HttpUriRequest request) {

// AWS calls are often signed, so we can't add headers without breaking the signature.
if (!awsClientCall) {
Propagators.defaultPropagator().inject(span, request, SETTER);
propagate()
.injectPathwayContext(
span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
Propagators.defaultPropagator().inject(span.with(dsmContext), request, SETTER);
}

return scope;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package datadog.trace.instrumentation.apachehttpclient5;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.apachehttpclient5.ApacheHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.apachehttpclient5.HttpHeadersInjectAdapter.SETTER;

import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import java.io.IOException;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.HttpException;
Expand All @@ -28,9 +28,8 @@ public void sendRequest(HttpRequest request, EntityDetails entityDetails, HttpCo
throws HttpException, IOException {
DECORATE.onRequest(span, request);

Propagators.defaultPropagator().inject(span, request, SETTER);
propagate()
.injectPathwayContext(span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
Propagators.defaultPropagator().inject(span.with(dsmContext), request, SETTER);
delegate.sendRequest(request, entityDetails, context);
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package datadog.trace.instrumentation.apachehttpclient5;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.apachehttpclient5.ApacheHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.apachehttpclient5.ApacheHttpClientDecorator.HTTP_REQUEST;
import static datadog.trace.instrumentation.apachehttpclient5.HttpHeadersInjectAdapter.SETTER;

import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import org.apache.hc.client5.http.classic.HttpClient;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
Expand Down Expand Up @@ -46,10 +46,8 @@ private static AgentScope activateHttpSpan(final HttpRequest request) {
final boolean awsClientCall = request.containsHeader("amz-sdk-invocation-id");
// AWS calls are often signed, so we can't add headers without breaking the signature.
if (!awsClientCall) {
Propagators.defaultPropagator().inject(span, request, SETTER);
propagate()
.injectPathwayContext(
span, request, SETTER, HttpClientDecorator.CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
Propagators.defaultPropagator().inject(span.with(dsmContext), request, SETTER);
}

return scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.CLIENT_PATHWAY_EDGE_TAGS;
import static datadog.trace.instrumentation.armeria.grpc.client.GrpcClientDecorator.DECORATE;
Expand All @@ -21,6 +20,7 @@
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.agent.tooling.muzzle.Reference;
import datadog.trace.api.InstrumenterConfig;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand Down Expand Up @@ -121,8 +121,8 @@ public static <T> AgentScope before(
if (null != responseListener && null != headers) {
span = InstrumentationContext.get(ClientCall.class, AgentSpan.class).get(call);
if (null != span) {
Propagators.defaultPropagator().inject(span, headers, SETTER);
propagate().injectPathwayContext(span, headers, SETTER, CLIENT_PATHWAY_EDGE_TAGS);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(CLIENT_PATHWAY_EDGE_TAGS);
Propagators.defaultPropagator().inject(span.with(dsmContext), headers, SETTER);
return activateSpan(span);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.instrumentation.aws.v2.eventbridge;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.BUS_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
Expand All @@ -9,6 +8,7 @@
import static datadog.trace.instrumentation.aws.v2.eventbridge.TextMapInjectAdapter.SETTER;

import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
Expand Down Expand Up @@ -86,12 +86,13 @@ private String getTraceContextToInject(
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append('{');

// Inject trace context
Propagators.defaultPropagator().inject(span, jsonBuilder, SETTER);

// Inject context
datadog.context.Context context = span;
if (traceConfig().isDataStreamsEnabled()) {
propagate().injectPathwayContext(span, jsonBuilder, SETTER, getTags(eventBusName));
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(eventBusName));
context = context.with(dsmContext);
}
Propagators.defaultPropagator().inject(context, jsonBuilder, SETTER);

// Add bus name and start time
jsonBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.instrumentation.aws.v1.sns;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
Expand All @@ -14,7 +13,9 @@
import com.amazonaws.services.sns.model.PublishBatchRequest;
import com.amazonaws.services.sns.model.PublishBatchRequestEntry;
import com.amazonaws.services.sns.model.PublishRequest;
import datadog.context.Context;
import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
Expand All @@ -37,10 +38,12 @@ private ByteBuffer getMessageAttributeValueToInject(
final AgentSpan span = newSpan(request);
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append('{');
Propagators.defaultPropagator().inject(span, jsonBuilder, SETTER);
Context context = span;
if (traceConfig().isDataStreamsEnabled()) {
propagate().injectPathwayContext(span, jsonBuilder, SETTER, getTags(snsTopicName));
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(snsTopicName));
context = context.with(dsmContext);
}
Propagators.defaultPropagator().inject(context, jsonBuilder, SETTER);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append('}');
return ByteBuffer.wrap(jsonBuilder.toString().getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package datadog.trace.instrumentation.aws.v2.sns;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
Expand All @@ -9,6 +8,7 @@
import static datadog.trace.instrumentation.aws.v2.sns.TextMapInjectAdapter.SETTER;

import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -38,10 +38,12 @@ private SdkBytes getMessageAttributeValueToInject(
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append('{');
Propagators.defaultPropagator().inject(span, jsonBuilder, SETTER);
datadog.context.Context context = span;
if (traceConfig().isDataStreamsEnabled()) {
propagate().injectPathwayContext(span, jsonBuilder, SETTER, getTags(snsTopicName));
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(snsTopicName));
context = context.with(dsmContext);
}
Propagators.defaultPropagator().inject(context, jsonBuilder, SETTER);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append('}');
return SdkBytes.fromString(jsonBuilder.toString(), StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.PathwayContext.DATADOG_KEY;
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
Expand All @@ -16,6 +16,10 @@
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import datadog.context.Context;
import datadog.context.propagation.Propagator;
import datadog.context.propagation.Propagators;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.LinkedHashMap;
Expand All @@ -36,23 +40,21 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
String queueUrl = smRequest.getQueueUrl();
if (queueUrl == null) return request;

LinkedHashMap<String, String> sortedTags = getTags(queueUrl);

final AgentSpan span = newSpan(request);
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context context = newContext(request, queueUrl);
// note: modifying message attributes has to be done before marshalling, otherwise the changes
// are not reflected in the actual request (and the MD5 check on send will fail).
propagate().injectPathwayContext(span, smRequest.getMessageAttributes(), SETTER, sortedTags);
dsmPropagator.inject(context, smRequest.getMessageAttributes(), SETTER);
} else if (request instanceof SendMessageBatchRequest) {
SendMessageBatchRequest smbRequest = (SendMessageBatchRequest) request;

String queueUrl = smbRequest.getQueueUrl();
if (queueUrl == null) return request;

LinkedHashMap<String, String> sortedTags = getTags(queueUrl);

final AgentSpan span = newSpan(request);
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
Context context = newContext(request, queueUrl);
for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) {
propagate().injectPathwayContext(span, entry.getMessageAttributes(), SETTER, sortedTags);
dsmPropagator.inject(context, entry.getMessageAttributes(), SETTER);
}
} else if (request instanceof ReceiveMessageRequest) {
ReceiveMessageRequest rmRequest = (ReceiveMessageRequest) request;
Expand All @@ -64,8 +66,14 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
return request;
}

private Context newContext(AmazonWebServiceRequest request, String queueUrl) {
AgentSpan span = newSpan(request);
DataStreamsContext dsmContext = DataStreamsContext.fromTags(getTags(queueUrl));
return span.with(dsmContext);
}

private AgentSpan newSpan(AmazonWebServiceRequest request) {
final AgentSpan span = startSpan("aws.sqs.send");
final AgentSpan span = startSpan("sqs", "aws.sqs.send");
// pass the span to TracingRequestHandler in the sdk instrumentation where it'll be enriched &
// activated
contextStore.put(request, span);
Expand Down
Loading

0 comments on commit 3f7c076

Please sign in to comment.