Skip to content

Commit 778cadd

Browse files
committed
feat(dsm): Implement DSM context injection
1 parent 76a2870 commit 778cadd

File tree

3 files changed

+116
-6
lines changed

3 files changed

+116
-6
lines changed

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamPropagator.java renamed to dd-trace-core/src/main/java/datadog/trace/core/datastreams/DataStreamsPropagator.java

+48-4
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,40 @@
11
package datadog.trace.core.datastreams;
22

3+
import static datadog.trace.api.DDTags.PATHWAY_HASH;
4+
import static datadog.trace.bootstrap.instrumentation.api.PathwayContext.PROPAGATION_KEY_BASE64;
5+
36
import datadog.context.Context;
47
import datadog.context.propagation.CarrierSetter;
58
import datadog.context.propagation.CarrierVisitor;
69
import datadog.context.propagation.Propagator;
710
import datadog.trace.api.TraceConfig;
11+
import datadog.trace.api.datastreams.DataStreamsContext;
812
import datadog.trace.api.time.TimeSource;
913
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
1014
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
1115
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
1216
import datadog.trace.bootstrap.instrumentation.api.TagContext;
17+
import java.io.IOException;
1318
import java.util.function.Supplier;
1419
import javax.annotation.Nullable;
1520
import javax.annotation.ParametersAreNonnullByDefault;
1621

1722
// TODO Javadoc
1823
@ParametersAreNonnullByDefault
19-
public class DataStreamPropagator implements Propagator {
24+
public class DataStreamsPropagator implements Propagator {
25+
private final DataStreamsMonitoring dataStreamsMonitoring;
2026
private final Supplier<TraceConfig> traceConfigSupplier;
2127
private final TimeSource timeSource;
2228
private final long hashOfKnownTags;
2329
private final String serviceNameOverride;
2430

25-
public DataStreamPropagator(
31+
public DataStreamsPropagator(
32+
DataStreamsMonitoring dataStreamsMonitoring,
2633
Supplier<TraceConfig> traceConfigSupplier,
2734
TimeSource timeSource,
2835
long hashOfKnownTags,
2936
String serviceNameOverride) {
37+
this.dataStreamsMonitoring = dataStreamsMonitoring;
3038
this.traceConfigSupplier = traceConfigSupplier;
3139
this.timeSource = timeSource;
3240
this.hashOfKnownTags = hashOfKnownTags;
@@ -35,12 +43,48 @@ public DataStreamPropagator(
3543

3644
@Override
3745
public <C> void inject(Context context, C carrier, CarrierSetter<C> setter) {
38-
// TODO Still in CorePropagation, not migrated yet
46+
// TODO Pathway context needs to be stored into its own context element instead of span context
47+
AgentSpan span = AgentSpan.fromContext(context);
48+
DataStreamsContext dsmContext = DataStreamsContext.fromContext(context);
49+
PathwayContext pathwayContext;
50+
if (span == null
51+
|| (pathwayContext = span.context().getPathwayContext()) == null
52+
|| dsmContext == null) {
53+
return;
54+
}
55+
56+
// TODO Allow set checkpoint to use DsmContext as parameter?
57+
pathwayContext.setCheckpoint(
58+
dsmContext.sortedTags(),
59+
dsmContext.sendCheckpoint() ? dataStreamsMonitoring::add : pathwayContext::saveStats,
60+
dsmContext.defaultTimestamp(),
61+
dsmContext.payloadSizeBytes());
62+
63+
boolean injected = injectPathwayContext(pathwayContext, carrier, setter);
64+
65+
if (injected && pathwayContext.getHash() != 0) {
66+
span.setTag(PATHWAY_HASH, Long.toUnsignedString(pathwayContext.getHash()));
67+
}
68+
}
69+
70+
private <C> boolean injectPathwayContext(
71+
PathwayContext pathwayContext, C carrier, CarrierSetter<C> setter) {
72+
try {
73+
String encodedContext = pathwayContext.encode();
74+
if (encodedContext != null) {
75+
// LOGGER.debug("Injecting pathway context {}", pathwayContext);
76+
setter.set(carrier, PROPAGATION_KEY_BASE64, encodedContext);
77+
return true;
78+
}
79+
} catch (IOException e) {
80+
// LOGGER.debug("Unable to set encode pathway context", e);
81+
}
82+
return false;
3983
}
4084

4185
@Override
4286
public <C> Context extract(Context context, C carrier, CarrierVisitor<C> visitor) {
43-
// TODO Pathway context needs to be stored into its own context element
87+
// TODO Pathway context needs to be stored into its own context element instead of span context
4488
// Get span context to store pathway context into
4589
TagContext spanContext = getSpanContextOrNull(context);
4690
PathwayContext pathwayContext;

dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,12 @@ public PathwayContext newPathwayContext() {
203203

204204
@Override
205205
public Propagator propagator() {
206-
return new DataStreamPropagator(
207-
this.traceConfigSupplier, this.timeSource, this.hashOfKnownTags, getThreadServiceName());
206+
return new DataStreamsPropagator(
207+
this,
208+
this.traceConfigSupplier,
209+
this.timeSource,
210+
this.hashOfKnownTags,
211+
getThreadServiceName());
208212
}
209213

210214
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package datadog.trace.api.datastreams;
2+
3+
import datadog.context.Context;
4+
import datadog.context.ContextKey;
5+
import datadog.context.ImplicitContextKeyed;
6+
import java.util.LinkedHashMap;
7+
8+
public class DataStreamsContext implements ImplicitContextKeyed {
9+
private static final ContextKey<DataStreamsContext> CONTEXT_KEY =
10+
ContextKey.named("dsm-context-key");
11+
12+
final LinkedHashMap<String, String> sortedTags;
13+
final long defaultTimestamp;
14+
final long payloadSizeBytes;
15+
final boolean sendCheckpoint;
16+
17+
public static DataStreamsContext fromContext(Context context) {
18+
return context.get(CONTEXT_KEY);
19+
}
20+
21+
public static DataStreamsContext fromTags(LinkedHashMap<String, String> sortedTags) {
22+
return new DataStreamsContext(sortedTags, 0, 0, true);
23+
}
24+
25+
public static DataStreamsContext fromTagsWithoutCheckpoint(
26+
LinkedHashMap<String, String> sortedTags) {
27+
return new DataStreamsContext(sortedTags, 0, 0, false);
28+
}
29+
30+
// That's basically a record for now
31+
private DataStreamsContext(
32+
LinkedHashMap<String, String> sortedTags,
33+
long defaultTimestamp,
34+
long payloadSizeBytes,
35+
boolean sendCheckpoint) {
36+
this.sortedTags = sortedTags;
37+
this.defaultTimestamp = defaultTimestamp;
38+
this.payloadSizeBytes = payloadSizeBytes;
39+
this.sendCheckpoint = sendCheckpoint;
40+
}
41+
42+
public LinkedHashMap<String, String> sortedTags() {
43+
return this.sortedTags;
44+
}
45+
46+
public long defaultTimestamp() {
47+
return this.defaultTimestamp;
48+
}
49+
50+
public long payloadSizeBytes() {
51+
return this.payloadSizeBytes;
52+
}
53+
54+
public boolean sendCheckpoint() {
55+
return this.sendCheckpoint;
56+
}
57+
58+
@Override
59+
public Context storeInto(Context context) {
60+
return context.with(CONTEXT_KEY, this);
61+
}
62+
}

0 commit comments

Comments
 (0)