Skip to content

Commit 1d5eedd

Browse files
committed
feat(dsm): Clean up API
Remove DSM injection method from AgentPropagation Make checkpoint use DataStreamsContext
1 parent fbee1b2 commit 1d5eedd

File tree

32 files changed

+185
-470
lines changed

32 files changed

+185
-470
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/decorator/HttpServerDecorator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package datadog.trace.bootstrap.instrumentation.decorator;
22

33
import static datadog.trace.api.cache.RadixTreeCache.UNSET_STATUS;
4+
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
45
import static datadog.trace.api.gateway.Events.EVENTS;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
67
import static datadog.trace.bootstrap.instrumentation.decorator.http.HttpResourceDecorator.HTTP_RESOURCE_DECORATOR;
@@ -148,7 +149,7 @@ public AgentSpan startSpan(
148149
}
149150
AgentPropagation.ContextVisitor<REQUEST_CARRIER> getter = getter();
150151
if (null != carrier && null != getter) {
151-
tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
152+
tracer().getDataStreamsMonitoring().setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS));
152153
}
153154
return span;
154155
}

dd-java-agent/instrumentation/armeria-grpc/src/main/java/datadog/trace/instrumentation/armeria/grpc/server/TracingServerInterceptor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.armeria.grpc.server;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
34
import static datadog.trace.api.gateway.Events.EVENTS;
45
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
@@ -71,7 +72,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
7172

7273
AgentTracer.get()
7374
.getDataStreamsMonitoring()
74-
.setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
75+
.setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS));
7576

7677
RequestContext reqContext = span.getRequestContext();
7778
if (reqContext != null) {

dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/AwsSdkClientDecorator.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.aws.v0;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
34
import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME;
45
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
56

@@ -264,7 +265,7 @@ public AgentSpan onServiceResponse(
264265

265266
AgentTracer.get()
266267
.getDataStreamsMonitoring()
267-
.setCheckpoint(span, sortedTags, 0, responseSize);
268+
.setCheckpoint(span, create(sortedTags, 0, responseSize));
268269
}
269270

270271
if ("PutObjectRequest".equalsIgnoreCase(awsOperation)
@@ -285,7 +286,7 @@ public AgentSpan onServiceResponse(
285286

286287
AgentTracer.get()
287288
.getDataStreamsMonitoring()
288-
.setCheckpoint(span, sortedTags, 0, payloadSize);
289+
.setCheckpoint(span, create(sortedTags, 0, payloadSize));
289290
}
290291
}
291292
}

dd-java-agent/instrumentation/aws-java-sdk-1.11.0/src/main/java/datadog/trace/instrumentation/aws/v0/TracingRequestHandler.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.aws.v0;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
34
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.XRAY_TRACING_CONCERN;
45
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.blackholeSpan;
@@ -19,6 +20,7 @@
1920
import datadog.context.propagation.Propagators;
2021
import datadog.trace.api.Config;
2122
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
23+
import datadog.trace.api.datastreams.DataStreamsContext;
2224
import datadog.trace.api.datastreams.PathwayContext;
2325
import datadog.trace.bootstrap.ContextStore;
2426
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
@@ -122,8 +124,8 @@ public void afterResponse(final Request<?> request, final Response<?> response)
122124
AgentDataStreamsMonitoring dataStreamsMonitoring =
123125
AgentTracer.get().getDataStreamsMonitoring();
124126
PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext();
125-
pathwayContext.setCheckpoint(
126-
sortedTags, dataStreamsMonitoring::add, arrivalTime.getTime());
127+
DataStreamsContext context = create(sortedTags, arrivalTime.getTime(), 0);
128+
pathwayContext.setCheckpoint(context, dataStreamsMonitoring::add);
127129
if (!span.context().getPathwayContext().isStarted()) {
128130
span.context().mergePathwayContext(pathwayContext);
129131
}

dd-java-agent/instrumentation/aws-java-sdk-2.2/src/main/java/datadog/trace/instrumentation/aws/v2/AwsSdkClientDecorator.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.aws.v2;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
34
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
45
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
56
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
@@ -361,7 +362,8 @@ public AgentSpan onSdkResponse(
361362
AgentTracer.get().getDataStreamsMonitoring();
362363
PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext();
363364
pathwayContext.setCheckpoint(
364-
sortedTags, dataStreamsMonitoring::add, arrivalTime.toEpochMilli());
365+
create(sortedTags, arrivalTime.toEpochMilli(), 0),
366+
dataStreamsMonitoring::add);
365367
if (!span.context().getPathwayContext().isStarted()) {
366368
span.context().mergePathwayContext(pathwayContext);
367369
}
@@ -391,7 +393,7 @@ public AgentSpan onSdkResponse(
391393

392394
AgentTracer.get()
393395
.getDataStreamsMonitoring()
394-
.setCheckpoint(span, sortedTags, 0, responseSize);
396+
.setCheckpoint(span, create(sortedTags, 0, responseSize));
395397
}
396398

397399
if ("PutObject".equalsIgnoreCase(awsOperation)) {
@@ -411,7 +413,7 @@ public AgentSpan onSdkResponse(
411413

412414
AgentTracer.get()
413415
.getDataStreamsMonitoring()
414-
.setCheckpoint(span, sortedTags, 0, payloadSize);
416+
.setCheckpoint(span, create(sortedTags, 0, payloadSize));
415417
}
416418
}
417419
}

dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/TracingIterator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.aws.v1.sqs;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
34
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
45
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
@@ -90,7 +91,7 @@ protected void startNewMessageSpan(Message message) {
9091
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
9192
sortedTags.put(TOPIC_TAG, urlFileName(queueUrl));
9293
sortedTags.put(TYPE_TAG, "sqs");
93-
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0);
94+
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(sortedTags, 0, 0));
9495

9596
CONSUMER_DECORATE.afterStart(span);
9697
CONSUMER_DECORATE.onConsume(span, queueUrl);

dd-java-agent/instrumentation/aws-java-sqs-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sqs/TracingIterator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.aws.v2.sqs;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
34
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
45
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
@@ -92,7 +93,7 @@ protected void startNewMessageSpan(Message message) {
9293
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
9394
sortedTags.put(TOPIC_TAG, urlFileName(queueUrl));
9495
sortedTags.put(TYPE_TAG, "sqs");
95-
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0);
96+
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(sortedTags, 0, 0));
9697

9798
CONSUMER_DECORATE.afterStart(span);
9899
CONSUMER_DECORATE.onConsume(span, queueUrl, requestId);

dd-java-agent/instrumentation/google-pubsub/src/main/java/datadog/trace/instrumentation/googlepubsub/PubSubDecorator.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import datadog.trace.api.Functions;
1414
import datadog.trace.api.cache.DDCache;
1515
import datadog.trace.api.cache.DDCaches;
16+
import datadog.trace.api.datastreams.DataStreamsContext;
1617
import datadog.trace.api.naming.SpanNaming;
1718
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1819
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
@@ -141,9 +142,10 @@ public AgentSpan onConsume(final PubsubMessage message, final String subscriptio
141142
.getDataStreamsMonitoring()
142143
.setCheckpoint(
143144
span,
144-
sortedTags,
145-
publishTime.getSeconds() * 1_000 + publishTime.getNanos() / (int) 1e6,
146-
message.getSerializedSize());
145+
DataStreamsContext.create(
146+
sortedTags,
147+
publishTime.getSeconds() * 1_000 + publishTime.getNanos() / (int) 1e6,
148+
message.getSerializedSize()));
147149
afterStart(span);
148150
span.setResourceName(
149151
CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(parsedSubscription, CONSUMER_PREFIX));

dd-java-agent/instrumentation/grpc-1.5/src/main/java/datadog/trace/instrumentation/grpc/server/TracingServerInterceptor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.grpc.server;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
34
import static datadog.trace.api.gateway.Events.EVENTS;
45
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
@@ -70,7 +71,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
7071

7172
AgentTracer.get()
7273
.getDataStreamsMonitoring()
73-
.setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
74+
.setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS));
7475

7576
RequestContext reqContext = span.getRequestContext();
7677
if (reqContext != null) {

dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients;
22

3-
import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
3+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
44
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
55
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
66
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
@@ -113,7 +113,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
113113
if (STREAMING_CONTEXT.isDisabledForTopic(val.topic())) {
114114
AgentTracer.get()
115115
.getDataStreamsMonitoring()
116-
.setCheckpoint(span, sortedTags, val.timestamp(), payloadSize);
116+
.setCheckpoint(span, create(sortedTags, val.timestamp(), payloadSize));
117117
} else {
118118
// when we're in a streaming context we want to consume only from source topics
119119
if (STREAMING_CONTEXT.isSourceTopic(val.topic())) {
@@ -122,7 +122,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
122122
// some other instance of the application, breaking the context propagation
123123
// for DSM users
124124
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
125-
DataStreamsContext dsmContext = fromKafka(sortedTags, val.timestamp(), payloadSize);
125+
DataStreamsContext dsmContext = create(sortedTags, val.timestamp(), payloadSize);
126126
dsmPropagator.inject(span.with(dsmContext), val.headers(), SETTER);
127127
}
128128
}

dd-java-agent/instrumentation/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TracingIterator.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package datadog.trace.instrumentation.kafka_clients38;
22

3-
import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
3+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
44
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
55
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
66
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
@@ -113,7 +113,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
113113
if (StreamingContext.STREAMING_CONTEXT.isDisabledForTopic(val.topic())) {
114114
AgentTracer.get()
115115
.getDataStreamsMonitoring()
116-
.setCheckpoint(span, sortedTags, val.timestamp(), payloadSize);
116+
.setCheckpoint(span, create(sortedTags, val.timestamp(), payloadSize));
117117
} else {
118118
// when we're in a streaming context we want to consume only from source topics
119119
if (StreamingContext.STREAMING_CONTEXT.isSourceTopic(val.topic())) {
@@ -122,7 +122,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
122122
// some other instance of the application, breaking the context propagation
123123
// for DSM users
124124
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
125-
DataStreamsContext dsmContext = fromKafka(sortedTags, val.timestamp(), payloadSize);
125+
DataStreamsContext dsmContext = create(sortedTags, val.timestamp(), payloadSize);
126126
dsmPropagator.inject(span.with(dsmContext), val.headers(), SETTER);
127127
}
128128
}

dd-java-agent/instrumentation/kafka-streams-0.11/src/main/java/datadog/trace/instrumentation/kafka_streams/KafkaStreamTaskInstrumentation.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package datadog.trace.instrumentation.kafka_streams;
22

33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4-
import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
4+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
55
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
66
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
77
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
@@ -265,11 +265,11 @@ public static void start(
265265
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())) {
266266
AgentTracer.get()
267267
.getDataStreamsMonitoring()
268-
.setCheckpoint(span, sortedTags, record.timestamp, payloadSize);
268+
.setCheckpoint(span, create(sortedTags, record.timestamp, payloadSize));
269269
} else {
270270
if (STREAMING_CONTEXT.isSourceTopic(record.topic())) {
271271
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
272-
DataStreamsContext dsmContext = fromKafka(sortedTags, record.timestamp, payloadSize);
272+
DataStreamsContext dsmContext = create(sortedTags, record.timestamp, payloadSize);
273273
dsmPropagator.inject(span.with(dsmContext), record, SR_SETTER);
274274
}
275275
}
@@ -347,11 +347,11 @@ public static void start(
347347
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())) {
348348
AgentTracer.get()
349349
.getDataStreamsMonitoring()
350-
.setCheckpoint(span, sortedTags, record.timestamp(), payloadSize);
350+
.setCheckpoint(span, create(sortedTags, record.timestamp(), payloadSize));
351351
} else {
352352
if (STREAMING_CONTEXT.isSourceTopic(record.topic())) {
353353
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
354-
DataStreamsContext dsmContext = fromKafka(sortedTags, record.timestamp(), payloadSize);
354+
DataStreamsContext dsmContext = create(sortedTags, record.timestamp(), payloadSize);
355355
dsmPropagator.inject(span.with(dsmContext), record, PR_SETTER);
356356
}
357357
}

dd-java-agent/instrumentation/rabbitmq-amqp-2.7/src/main/java/datadog/trace/instrumentation/rabbitmq/amqp/RabbitDecorator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.instrumentation.rabbitmq.amqp;
22

3+
import static datadog.trace.api.datastreams.DataStreamsContext.create;
34
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
45
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
56
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -253,7 +254,7 @@ public static AgentScope startReceivingSpan(
253254
sortedTags.put(TYPE_TAG, "rabbitmq");
254255
AgentTracer.get()
255256
.getDataStreamsMonitoring()
256-
.setCheckpoint(span, sortedTags, produceMillis, 0);
257+
.setCheckpoint(span, create(sortedTags, produceMillis, 0));
257258
}
258259

259260
CONSUMER_DECORATE.afterStart(span);

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@
8686
import datadog.trace.core.monitor.HealthMetrics;
8787
import datadog.trace.core.monitor.MonitoringImpl;
8888
import datadog.trace.core.monitor.TracerHealthMetrics;
89-
import datadog.trace.core.propagation.CorePropagation;
9089
import datadog.trace.core.propagation.ExtractedContext;
9190
import datadog.trace.core.propagation.HttpCodec;
9291
import datadog.trace.core.propagation.PropagationTags;
@@ -228,7 +227,7 @@ public static CoreTracerBuilder builder() {
228227
private final SortedSet<TraceInterceptor> interceptors =
229228
new ConcurrentSkipListSet<>(Comparator.comparingInt(TraceInterceptor::priority));
230229

231-
private final CorePropagation propagation;
230+
private final AgentPropagation propagation;
232231
private final boolean logs128bTraceIdEnabled;
233232

234233
private final InstrumentationGateway instrumentationGateway;
@@ -714,8 +713,8 @@ private CoreTracer(
714713

715714
sharedCommunicationObjects.whenReady(this.dataStreamsMonitoring::start);
716715

717-
// Store all propagators to propagation -- only DSM injection left
718-
this.propagation = new CorePropagation(this.dataStreamsMonitoring.injector());
716+
// TODO Need to be removed
717+
this.propagation = AgentTracer.NOOP_TRACER.propagate();
719718

720719
// Register context propagators
721720
HttpCodec.Extractor tracingExtractor =

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamContextExtractor.java

-64
This file was deleted.

0 commit comments

Comments
 (0)