Skip to content

Commit b9bb806

Browse files
committed
feat(core): Move DSM extractor to its own propagator
1 parent edd1220 commit b9bb806

File tree

8 files changed

+101
-241
lines changed

8 files changed

+101
-241
lines changed

dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static datadog.trace.api.DDTags.DJM_ENABLED;
66
import static datadog.trace.api.DDTags.DSM_ENABLED;
77
import static datadog.trace.api.DDTags.PROFILING_CONTEXT_ENGINE;
8+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
89
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.STANDALONE_ASM_CONCERN;
910
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.TRACING_CONCERN;
1011
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.XRAY_TRACING_CONCERN;
@@ -712,17 +713,21 @@ private CoreTracer(
712713

713714
sharedCommunicationObjects.whenReady(this.dataStreamsMonitoring::start);
714715

715-
// Create default extractor from config if not provided and decorate it with DSM extractor
716-
HttpCodec.Extractor builtExtractor =
717-
extractor == null ? HttpCodec.createExtractor(config, this::captureTraceConfig) : extractor;
718-
builtExtractor = this.dataStreamsMonitoring.extractor(builtExtractor);
719-
// Store all propagators to propagation
720-
this.propagation = new CorePropagation(builtExtractor, this.dataStreamsMonitoring.injector());
716+
// Store all propagators to propagation -- only DSM injection left
717+
this.propagation = new CorePropagation(this.dataStreamsMonitoring.injector());
721718

719+
// Register context propagators
720+
HttpCodec.Extractor tracingExtractor =
721+
extractor == null ? HttpCodec.createExtractor(config, this::captureTraceConfig) : extractor;
722722
boolean appSec = config.isAppSecStandaloneEnabled();
723+
boolean dsm = config.isDataStreamsEnabled();
723724
Propagators.register(STANDALONE_ASM_CONCERN, new StandaloneAsmPropagator(), appSec);
724-
Propagators.register(TRACING_CONCERN, new TracingPropagator(injector, extractor), !appSec);
725+
Propagators.register(
726+
TRACING_CONCERN, new TracingPropagator(injector, tracingExtractor), !appSec);
725727
Propagators.register(XRAY_TRACING_CONCERN, new XRayPropagator(config), false);
728+
if (dsm) {
729+
Propagators.register(DSM_CONCERN, this.dataStreamsMonitoring.propagator());
730+
}
726731

727732
this.tagInterceptor =
728733
null == tagInterceptor ? new TagInterceptor(new RuleFlags(config)) : tagInterceptor;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package datadog.trace.core.datastreams;
2+
3+
import datadog.context.Context;
4+
import datadog.context.propagation.CarrierSetter;
5+
import datadog.context.propagation.CarrierVisitor;
6+
import datadog.context.propagation.Propagator;
7+
import datadog.trace.api.time.TimeSource;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
9+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
10+
import datadog.trace.bootstrap.instrumentation.api.TagContext;
11+
import javax.annotation.ParametersAreNonnullByDefault;
12+
13+
// TODO Javadoc
14+
@ParametersAreNonnullByDefault
15+
public class DataStreamPropagator implements Propagator {
16+
private final TimeSource timeSource;
17+
private final long hashOfKnownTags;
18+
private final String serviceNameOverride;
19+
20+
public DataStreamPropagator(
21+
TimeSource timeSource, long hashOfKnownTags, String serviceNameOverride) {
22+
this.timeSource = timeSource;
23+
this.hashOfKnownTags = hashOfKnownTags;
24+
this.serviceNameOverride = serviceNameOverride;
25+
}
26+
27+
@Override
28+
public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
29+
// TODO Still in CorePropagation, not migrated yet
30+
}
31+
32+
@Override
33+
public <C> Context extract(Context context, C carrier, CarrierVisitor<C> visitor) {
34+
// Extract pathway context
35+
DefaultPathwayContext pathwayContext =
36+
DefaultPathwayContext.extract(
37+
carrier, visitor, this.timeSource, this.hashOfKnownTags, this.serviceNameOverride);
38+
if (pathwayContext == null) {
39+
return context;
40+
}
41+
// TODO Pathway context needs to be store into its own context element
42+
// Get span context to store pathway context into
43+
TagContext spanContext;
44+
AgentSpan extractedSpan = AgentSpan.fromContext(context);
45+
AgentSpanContext extractedSpanContext;
46+
if (extractedSpan != null
47+
&& (extractedSpanContext = extractedSpan.context()) instanceof TagContext) {
48+
spanContext = (TagContext) extractedSpanContext;
49+
} else {
50+
spanContext = new TagContext();
51+
AgentSpan span = AgentSpan.fromSpanContext(spanContext);
52+
context = Context.root().with(span);
53+
}
54+
spanContext.withPathwayContext(pathwayContext);
55+
// Return the original context, with mutated span context
56+
return context;
57+
}
58+
}

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsMonitoring.java

+8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package datadog.trace.core.datastreams;
22

3+
import datadog.context.propagation.Propagator;
34
import datadog.trace.api.experimental.DataStreamsContextCarrier;
45
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
56
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
@@ -10,6 +11,13 @@
1011
public interface DataStreamsMonitoring extends AgentDataStreamsMonitoring, AutoCloseable {
1112
void start();
1213

14+
/**
15+
* Gets the propagator for DSM concern.
16+
*
17+
* @return The propagator for DSM concern.
18+
*/
19+
Propagator propagator();
20+
1321
/**
1422
* Get a context extractor that support {@link PathwayContext} extraction.
1523
*

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import datadog.communication.ddagent.DDAgentFeaturesDiscovery;
1616
import datadog.communication.ddagent.SharedCommunicationObjects;
17+
import datadog.context.propagation.Propagator;
1718
import datadog.trace.api.Config;
1819
import datadog.trace.api.TraceConfig;
1920
import datadog.trace.api.WellKnownTags;
@@ -200,6 +201,11 @@ public PathwayContext newPathwayContext() {
200201
}
201202
}
202203

204+
@Override
205+
public Propagator propagator() {
206+
return new DataStreamPropagator(this.timeSource, this.hashOfKnownTags, getThreadServiceName());
207+
}
208+
203209
@Override
204210
public HttpCodec.Extractor extractor(HttpCodec.Extractor delegate) {
205211
return new DataStreamContextExtractor(

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java

+7-8
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
import com.datadoghq.sketch.ddsketch.encoding.ByteArrayInput;
88
import com.datadoghq.sketch.ddsketch.encoding.GrowingByteArrayOutput;
99
import com.datadoghq.sketch.ddsketch.encoding.VarEncodingHelper;
10+
import datadog.context.propagation.CarrierVisitor;
1011
import datadog.trace.api.Config;
1112
import datadog.trace.api.WellKnownTags;
1213
import datadog.trace.api.time.TimeSource;
13-
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
1414
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
1515
import datadog.trace.bootstrap.instrumentation.api.StatsPoint;
1616
import datadog.trace.util.FNV64Hash;
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.TimeUnit;
2727
import java.util.concurrent.locks.Lock;
2828
import java.util.concurrent.locks.ReentrantLock;
29+
import java.util.function.BiConsumer;
2930
import java.util.function.Consumer;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -267,7 +268,7 @@ public String toString() {
267268
}
268269
}
269270

270-
private static class PathwayContextExtractor implements AgentPropagation.KeyClassifier {
271+
private static class PathwayContextExtractor implements BiConsumer<String, String> {
271272
private final TimeSource timeSource;
272273
private final long hashOfKnownTags;
273274
private final String serviceNameOverride;
@@ -281,27 +282,25 @@ private static class PathwayContextExtractor implements AgentPropagation.KeyClas
281282
}
282283

283284
@Override
284-
public boolean accept(String key, String value) {
285+
public void accept(String key, String value) {
285286
if (PROPAGATION_KEY_BASE64.equalsIgnoreCase(key)) {
286287
try {
287288
extractedContext = decode(timeSource, hashOfKnownTags, serviceNameOverride, value);
288-
} catch (IOException e) {
289-
return false;
289+
} catch (IOException ignored) {
290290
}
291291
}
292-
return true;
293292
}
294293
}
295294

296295
static <C> DefaultPathwayContext extract(
297296
C carrier,
298-
AgentPropagation.ContextVisitor<C> getter,
297+
CarrierVisitor<C> getter,
299298
TimeSource timeSource,
300299
long hashOfKnownTags,
301300
String serviceNameOverride) {
302301
PathwayContextExtractor pathwayContextExtractor =
303302
new PathwayContextExtractor(timeSource, hashOfKnownTags, serviceNameOverride);
304-
getter.forEachKey(carrier, pathwayContextExtractor);
303+
getter.forEachKeyValue(carrier, pathwayContextExtractor);
305304
if (pathwayContextExtractor.extractedContext == null) {
306305
log.debug("No context extracted");
307306
} else {

dd-trace-core/src/main/java/datadog/trace/core/propagation/CorePropagation.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.core.propagation;
22

3+
import datadog.context.Context;
4+
import datadog.context.propagation.Propagators;
35
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
46
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
57
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
@@ -8,18 +10,14 @@
810

911
public class CorePropagation implements AgentPropagation {
1012
private final DataStreamContextInjector dataStreamContextInjector;
11-
private final HttpCodec.Extractor extractor;
1213

1314
/**
1415
* Constructor
1516
*
16-
* @param extractor The context extractor.
1717
* @param dataStreamContextInjector The DSM context injector, as a specific object until generic
1818
* context injection is available.
1919
*/
20-
public CorePropagation(
21-
HttpCodec.Extractor extractor, DataStreamContextInjector dataStreamContextInjector) {
22-
this.extractor = extractor;
20+
public CorePropagation(DataStreamContextInjector dataStreamContextInjector) {
2321
this.dataStreamContextInjector = dataStreamContextInjector;
2422
}
2523

@@ -50,6 +48,8 @@ public <C> void injectPathwayContextWithoutSendingStats(
5048

5149
@Override
5250
public <C> AgentSpanContext.Extracted extract(final C carrier, final ContextVisitor<C> getter) {
53-
return extractor.extract(carrier, getter);
51+
Context extracted = Propagators.defaultPropagator().extract(Context.root(), carrier, getter);
52+
AgentSpan extractedSpan = AgentSpan.fromContext(extracted);
53+
return extractedSpan == null ? null : (AgentSpanContext.Extracted) extractedSpan.context();
5454
}
5555
}

0 commit comments

Comments
 (0)