Skip to content

Commit

Permalink
feat(dsm): Clean up API
Browse files Browse the repository at this point in the history
Remove DSM injection method from AgentPropagation
Make checkpoint use DataStreamsContext
  • Loading branch information
PerfectSlayer committed Feb 4, 2025
1 parent 5afe264 commit f5e6dc1
Show file tree
Hide file tree
Showing 29 changed files with 96 additions and 386 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datadog.trace.bootstrap.instrumentation.decorator;

import static datadog.trace.api.cache.RadixTreeCache.UNSET_STATUS;
import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
import static datadog.trace.api.gateway.Events.EVENTS;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.traceConfig;
import static datadog.trace.bootstrap.instrumentation.decorator.http.HttpResourceDecorator.HTTP_RESOURCE_DECORATOR;
Expand Down Expand Up @@ -148,7 +149,7 @@ public AgentSpan startSpan(
}
AgentPropagation.ContextVisitor<REQUEST_CARRIER> getter = getter();
if (null != carrier && null != getter) {
tracer().getDataStreamsMonitoring().setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
tracer().getDataStreamsMonitoring().setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS));
}
return span;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.armeria.grpc.server;

import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
import static datadog.trace.api.gateway.Events.EVENTS;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
Expand Down Expand Up @@ -71,7 +72,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
.setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS));

RequestContext reqContext = span.getRequestContext();
if (reqContext != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.aws.v0;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.ResourceNamePriorities.RPC_COMMAND_NAME;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;

Expand Down Expand Up @@ -264,7 +265,7 @@ public AgentSpan onServiceResponse(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, 0, responseSize);
.setCheckpoint(span, create(sortedTags, 0, responseSize));
}

if ("PutObjectRequest".equalsIgnoreCase(awsOperation)
Expand All @@ -285,7 +286,7 @@ public AgentSpan onServiceResponse(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, 0, payloadSize);
.setCheckpoint(span, create(sortedTags, 0, payloadSize));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.aws.v0;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.XRAY_TRACING_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.blackholeSpan;
Expand All @@ -19,6 +20,7 @@
import datadog.context.propagation.Propagators;
import datadog.trace.api.Config;
import datadog.trace.api.datastreams.AgentDataStreamsMonitoring;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.datastreams.PathwayContext;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
Expand Down Expand Up @@ -122,8 +124,8 @@ public void afterResponse(final Request<?> request, final Response<?> response)
AgentDataStreamsMonitoring dataStreamsMonitoring =
AgentTracer.get().getDataStreamsMonitoring();
PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext();
pathwayContext.setCheckpoint(
sortedTags, dataStreamsMonitoring::add, arrivalTime.getTime());
DataStreamsContext context = create(sortedTags, arrivalTime.getTime(), 0);
pathwayContext.setCheckpoint(context, dataStreamsMonitoring::add);
if (!span.context().getPathwayContext().isStarted()) {
span.context().mergePathwayContext(pathwayContext);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.aws.v2;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
Expand Down Expand Up @@ -361,7 +362,8 @@ public AgentSpan onSdkResponse(
AgentTracer.get().getDataStreamsMonitoring();
PathwayContext pathwayContext = dataStreamsMonitoring.newPathwayContext();
pathwayContext.setCheckpoint(
sortedTags, dataStreamsMonitoring::add, arrivalTime.toEpochMilli());
create(sortedTags, arrivalTime.toEpochMilli(), 0),
dataStreamsMonitoring::add);
if (!span.context().getPathwayContext().isStarted()) {
span.context().mergePathwayContext(pathwayContext);
}
Expand Down Expand Up @@ -391,7 +393,7 @@ public AgentSpan onSdkResponse(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, 0, responseSize);
.setCheckpoint(span, create(sortedTags, 0, responseSize));
}

if ("PutObject".equalsIgnoreCase(awsOperation)) {
Expand All @@ -411,7 +413,7 @@ public AgentSpan onSdkResponse(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, 0, payloadSize);
.setCheckpoint(span, create(sortedTags, 0, payloadSize));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
Expand Down Expand Up @@ -90,7 +91,7 @@ protected void startNewMessageSpan(Message message) {
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
sortedTags.put(TOPIC_TAG, urlFileName(queueUrl));
sortedTags.put(TYPE_TAG, "sqs");
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0);
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(sortedTags, 0, 0));

CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onConsume(span, queueUrl);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.aws.v2.sqs;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
Expand Down Expand Up @@ -92,7 +93,7 @@ protected void startNewMessageSpan(Message message) {
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
sortedTags.put(TOPIC_TAG, urlFileName(queueUrl));
sortedTags.put(TYPE_TAG, "sqs");
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0);
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, create(sortedTags, 0, 0));

CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onConsume(span, queueUrl, requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datadog.trace.api.Functions;
import datadog.trace.api.cache.DDCache;
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.datastreams.DataStreamsContext;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
Expand Down Expand Up @@ -141,9 +142,10 @@ public AgentSpan onConsume(final PubsubMessage message, final String subscriptio
.getDataStreamsMonitoring()
.setCheckpoint(
span,
sortedTags,
publishTime.getSeconds() * 1_000 + publishTime.getNanos() / (int) 1e6,
message.getSerializedSize());
DataStreamsContext.create(
sortedTags,
publishTime.getSeconds() * 1_000 + publishTime.getNanos() / (int) 1e6,
message.getSerializedSize()));
afterStart(span);
span.setResourceName(
CONSUMER_RESOURCE_NAME_CACHE.computeIfAbsent(parsedSubscription, CONSUMER_PREFIX));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.grpc.server;

import static datadog.trace.api.datastreams.DataStreamsContext.fromTags;
import static datadog.trace.api.gateway.Events.EVENTS;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
Expand Down Expand Up @@ -70,7 +71,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(

AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, SERVER_PATHWAY_EDGE_TAGS, 0, 0);
.setCheckpoint(span, fromTags(SERVER_PATHWAY_EDGE_TAGS));

RequestContext reqContext = span.getRequestContext();
if (reqContext != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package datadog.trace.instrumentation.kafka_clients;

import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
Expand Down Expand Up @@ -113,7 +113,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
if (STREAMING_CONTEXT.isDisabledForTopic(val.topic())) {
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, val.timestamp(), payloadSize);
.setCheckpoint(span, create(sortedTags, val.timestamp(), payloadSize));
} else {
// when we're in a streaming context we want to consume only from source topics
if (STREAMING_CONTEXT.isSourceTopic(val.topic())) {
Expand All @@ -122,7 +122,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
// some other instance of the application, breaking the context propagation
// for DSM users
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
DataStreamsContext dsmContext = fromKafka(sortedTags, val.timestamp(), payloadSize);
DataStreamsContext dsmContext = create(sortedTags, val.timestamp(), payloadSize);
dsmPropagator.inject(span.with(dsmContext), val.headers(), SETTER);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package datadog.trace.instrumentation.kafka_clients38;

import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateNext;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
Expand Down Expand Up @@ -113,7 +113,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
if (StreamingContext.STREAMING_CONTEXT.isDisabledForTopic(val.topic())) {
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, val.timestamp(), payloadSize);
.setCheckpoint(span, create(sortedTags, val.timestamp(), payloadSize));
} else {
// when we're in a streaming context we want to consume only from source topics
if (StreamingContext.STREAMING_CONTEXT.isSourceTopic(val.topic())) {
Expand All @@ -122,7 +122,7 @@ protected void startNewRecordSpan(ConsumerRecord<?, ?> val) {
// some other instance of the application, breaking the context propagation
// for DSM users
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
DataStreamsContext dsmContext = fromKafka(sortedTags, val.timestamp(), payloadSize);
DataStreamsContext dsmContext = create(sortedTags, val.timestamp(), payloadSize);
dsmPropagator.inject(span.with(dsmContext), val.headers(), SETTER);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package datadog.trace.instrumentation.kafka_streams;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.api.datastreams.DataStreamsContext.fromKafka;
import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
Expand Down Expand Up @@ -265,11 +265,11 @@ public static void start(
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())) {
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, record.timestamp, payloadSize);
.setCheckpoint(span, create(sortedTags, record.timestamp, payloadSize));
} else {
if (STREAMING_CONTEXT.isSourceTopic(record.topic())) {
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
DataStreamsContext dsmContext = fromKafka(sortedTags, record.timestamp, payloadSize);
DataStreamsContext dsmContext = create(sortedTags, record.timestamp, payloadSize);
dsmPropagator.inject(span.with(dsmContext), record, SR_SETTER);
}
}
Expand Down Expand Up @@ -347,11 +347,11 @@ public static void start(
if (STREAMING_CONTEXT.isDisabledForTopic(record.topic())) {
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, record.timestamp(), payloadSize);
.setCheckpoint(span, create(sortedTags, record.timestamp(), payloadSize));
} else {
if (STREAMING_CONTEXT.isSourceTopic(record.topic())) {
Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN);
DataStreamsContext dsmContext = fromKafka(sortedTags, record.timestamp(), payloadSize);
DataStreamsContext dsmContext = create(sortedTags, record.timestamp(), payloadSize);
dsmPropagator.inject(span.with(dsmContext), record, PR_SETTER);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.rabbitmq.amqp;

import static datadog.trace.api.datastreams.DataStreamsContext.create;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
Expand Down Expand Up @@ -253,7 +254,7 @@ public static AgentScope startReceivingSpan(
sortedTags.put(TYPE_TAG, "rabbitmq");
AgentTracer.get()
.getDataStreamsMonitoring()
.setCheckpoint(span, sortedTags, produceMillis, 0);
.setCheckpoint(span, create(sortedTags, produceMillis, 0));
}

CONSUMER_DECORATE.afterStart(span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.core.monitor.MonitoringImpl;
import datadog.trace.core.monitor.TracerHealthMetrics;
import datadog.trace.core.propagation.CorePropagation;
import datadog.trace.core.propagation.ExtractedContext;
import datadog.trace.core.propagation.HttpCodec;
import datadog.trace.core.propagation.PropagationTags;
Expand Down Expand Up @@ -228,7 +227,7 @@ public static CoreTracerBuilder builder() {
private final SortedSet<TraceInterceptor> interceptors =
new ConcurrentSkipListSet<>(Comparator.comparingInt(TraceInterceptor::priority));

private final CorePropagation propagation;
private final AgentPropagation propagation;
private final boolean logs128bTraceIdEnabled;

private final InstrumentationGateway instrumentationGateway;
Expand Down Expand Up @@ -714,8 +713,8 @@ private CoreTracer(

sharedCommunicationObjects.whenReady(this.dataStreamsMonitoring::start);

// Store all propagators to propagation -- only DSM injection left
this.propagation = new CorePropagation(this.dataStreamsMonitoring.injector());
// TODO Need to be removed
this.propagation = new AgentPropagation() {};

// Register context propagators
HttpCodec.Extractor tracingExtractor =
Expand Down

This file was deleted.

Loading

0 comments on commit f5e6dc1

Please sign in to comment.